Skip to content

Commit

Permalink
moar improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Aug 30, 2023
1 parent e51240a commit d6ebcab
Showing 1 changed file with 49 additions and 27 deletions.
76 changes: 49 additions & 27 deletions packages/core/src/lib/stream_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,33 @@ import debug from "debug";
export class StreamManager {
private streamsPool: Map<string, Stream[]> = new Map();
private ongoingStreamCreations: Map<string, Promise<Stream>> = new Map();
private readonly MAX_STREAMS_PER_PEER = 3;
private readonly MAX_RETRIES = 3;

constructor(
public multicodec: string,
public getConnections: Libp2p["getConnections"],
private log: debug.Debugger
) {}

private async newStream(peer: Peer): Promise<Stream> {
const connections = this.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
private async newStream(
peer: Peer,
retries = this.MAX_RETRIES
): Promise<Stream> {
try {
const connections = this.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
return connection.newStream(this.multicodec);
} catch (error) {
if (retries > 0) {
this.log(`Retrying stream creation. Retries left: ${retries}`);
return this.newStream(peer, retries - 1);
}
throw error;
}
return connection.newStream(this.multicodec);
}

protected async getStream(peer: Peer): Promise<Stream> {
Expand All @@ -34,8 +47,8 @@ export class StreamManager {
await ongoingCreation;
}

const peerStreams = this.streamsPool.get(peerIdStr);
if (peerStreams && peerStreams.length > 0) {
const peerStreams = this.streamsPool.get(peerIdStr) || [];
if (peerStreams.length > 0) {
const stream = peerStreams.pop();
if (!stream) {
throw new Error("Failed to get a stream from the pool");
Expand Down Expand Up @@ -69,36 +82,45 @@ export class StreamManager {
}

private replenishStreamPool(peer: Peer): void {
this.createAndSaveStream(peer)
.then(() => {
this.log(`Replenished stream pool for peer ${peer.id.toString()}`);
})
.catch((err) => {
this.log(
`Error replenishing stream pool for peer ${peer.id.toString()}: ${err}`
);
});
}
const peerIdStr = peer.id.toString();
const ongoingCreationsCount = this.ongoingStreamCreations.has(peerIdStr)
? 1
: 0;
const availableStreamsCount = (this.streamsPool.get(peerIdStr) || [])
.length;

protected handlePeerUpdateStreamPool(evt: CustomEvent<PeerUpdate>): void {
const peer = evt.detail.peer;
if (peer.protocols.includes(this.multicodec)) {
this.streamsPool.set(peer.id.toString(), []);
this.log(`Optimistically opening a stream to ${peer.id.toString()}`);
if (
ongoingCreationsCount + availableStreamsCount <
this.MAX_STREAMS_PER_PEER
) {
this.createAndSaveStream(peer)
.then(() => {
this.log(
`Optimistic stream opening succeeded for ${peer.id.toString()}`
);
this.log(`Replenished stream pool for peer ${peer.id.toString()}`);
})
.catch((err) => {
this.log(`Error during optimistic stream opening: ${err}`);
this.log(
`Error replenishing stream pool for peer ${peer.id.toString()}: ${err}`
);
});
}
}

protected handlePeerUpdateStreamPool(evt: CustomEvent<PeerUpdate>): void {
const peer = evt.detail.peer;
if (peer.protocols.includes(this.multicodec)) {
const peerIdStr = peer.id.toString();
if (!this.streamsPool.has(peerIdStr)) {
this.streamsPool.set(peerIdStr, []);
}
this.log(`Optimistically opening a stream to ${peer.id.toString()}`);
this.replenishStreamPool(peer);
}
}

protected handlePeerDisconnectStreamPool(evt: CustomEvent<PeerId>): void {
const peerId = evt.detail;
this.streamsPool.delete(peerId.toString());
// Cancel ongoing stream creation if any
this.ongoingStreamCreations.delete(peerId.toString());
}
}

0 comments on commit d6ebcab

Please sign in to comment.