Skip to content

Commit

Permalink
wait for stream creation instead of throwing
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Aug 30, 2023
1 parent 61c3373 commit e51240a
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions packages/core/src/lib/stream_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import debug from "debug";

export class StreamManager {
private streamsPool: Map<string, Stream[]> = new Map();
private peersWithStreamCreationInProgress: Set<string> = new Set();
private ongoingStreamCreations: Map<string, Promise<Stream>> = new Map();

constructor(
public multicodec: string,
Expand All @@ -28,10 +28,10 @@ export class StreamManager {
protected async getStream(peer: Peer): Promise<Stream> {
const peerIdStr = peer.id.toString();

if (this.peersWithStreamCreationInProgress.has(peerIdStr)) {
throw new Error(
`Stream creation already in progress for peer ${peerIdStr}`
);
const ongoingCreation = this.ongoingStreamCreations.get(peerIdStr);
if (ongoingCreation) {
// Wait for the ongoing stream creation to complete
await ongoingCreation;
}

const peerStreams = this.streamsPool.get(peerIdStr);
Expand All @@ -50,16 +50,21 @@ export class StreamManager {

private async createAndSaveStream(peer: Peer): Promise<Stream> {
const peerIdStr = peer.id.toString();
this.peersWithStreamCreationInProgress.add(peerIdStr);

try {
const streamCreationPromise = (async () => {
const stream = await this.newStream(peer);
const peerStreams = this.streamsPool.get(peerIdStr) || [];
peerStreams.push(stream);
this.streamsPool.set(peerIdStr, peerStreams);
return stream;
})();

this.ongoingStreamCreations.set(peerIdStr, streamCreationPromise);

try {
return await streamCreationPromise;
} finally {
this.peersWithStreamCreationInProgress.delete(peerIdStr);
this.ongoingStreamCreations.delete(peerIdStr);
}
}

Expand Down

0 comments on commit e51240a

Please sign in to comment.