diff --git a/src/Client.php b/src/Client.php index f7cdd22..a247427 100644 --- a/src/Client.php +++ b/src/Client.php @@ -205,16 +205,19 @@ public function publish(string $uri, $obs, array $options = []): DisposableInter return $this->session ->takeUntil($completed->delay(1)) - ->mapTo($obs->finally(function () use ($completed) { - $completed->onNext(0); - })) - ->switchFirst() - ->map(function ($value) use ($uri, $options) { - return new PublishMessage(Utils::getUniqueId(), (object)$options, $uri, [$value]); + ->pluck(1) + ->map(function ( $webSocket) use ($obs, $completed, $uri, $options) { + return $obs + ->finally(function () use ($completed) { + $completed->onNext(0); + }) + ->map(function ($value) use ($uri, $options) { + return new PublishMessage(Utils::getUniqueId(), (object)$options, $uri, [$value]); + }) + ->do([$webSocket, 'onNext']); }) - ->subscribe(function ($value) { - $this->webSocket->onNext($value); - }); + ->switchFirst() + ->subscribe(); } public function onChallenge(callable $challengeCallback)