From e51240a492672a9bfe1c7bd4d1d5a8580d83376e Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 30 Aug 2023 21:21:17 +0530 Subject: [PATCH] wait for stream creation instead of throwing --- packages/core/src/lib/stream_manager.ts | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/packages/core/src/lib/stream_manager.ts b/packages/core/src/lib/stream_manager.ts index 45f918dc15..41705fa706 100644 --- a/packages/core/src/lib/stream_manager.ts +++ b/packages/core/src/lib/stream_manager.ts @@ -8,7 +8,7 @@ import debug from "debug"; export class StreamManager { private streamsPool: Map = new Map(); - private peersWithStreamCreationInProgress: Set = new Set(); + private ongoingStreamCreations: Map> = new Map(); constructor( public multicodec: string, @@ -28,10 +28,10 @@ export class StreamManager { protected async getStream(peer: Peer): Promise { const peerIdStr = peer.id.toString(); - if (this.peersWithStreamCreationInProgress.has(peerIdStr)) { - throw new Error( - `Stream creation already in progress for peer ${peerIdStr}` - ); + const ongoingCreation = this.ongoingStreamCreations.get(peerIdStr); + if (ongoingCreation) { + // Wait for the ongoing stream creation to complete + await ongoingCreation; } const peerStreams = this.streamsPool.get(peerIdStr); @@ -50,16 +50,21 @@ export class StreamManager { private async createAndSaveStream(peer: Peer): Promise { const peerIdStr = peer.id.toString(); - this.peersWithStreamCreationInProgress.add(peerIdStr); - try { + const streamCreationPromise = (async () => { const stream = await this.newStream(peer); const peerStreams = this.streamsPool.get(peerIdStr) || []; peerStreams.push(stream); this.streamsPool.set(peerIdStr, peerStreams); return stream; + })(); + + this.ongoingStreamCreations.set(peerIdStr, streamCreationPromise); + + try { + return await streamCreationPromise; } finally { - this.peersWithStreamCreationInProgress.delete(peerIdStr); + this.ongoingStreamCreations.delete(peerIdStr); } }