Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lightPush): improve peer usage and improve readability #2155

Merged
merged 12 commits into from
Oct 4, 2024
Merged
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
"it-all": "^3.0.4",
"it-length-prefixed": "^9.0.4",
"it-pipe": "^3.0.1",
"p-event": "^6.0.1",
"uint8arraylist": "^2.4.3",
"uuid": "^9.0.0"
},
Expand Down
4 changes: 1 addition & 3 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ export * as waku_light_push from "./lib/light_push/index.js";
export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";

export * as waku_store from "./lib/store/index.js";
export { StoreCore } from "./lib/store/index.js";

export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";
export { StoreCore, StoreCodec } from "./lib/store/index.js";

export { ConnectionManager } from "./lib/connection_manager.js";

Expand Down
29 changes: 28 additions & 1 deletion packages/core/src/lib/stream_manager/stream_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,32 @@ describe("StreamManager", () => {
}
});

it("should return different streams if requested simultaniously", async () => {
const con1 = createMockConnection();
con1.streams = [createMockStream({ id: "1", protocol: MULTICODEC })];

const newStreamSpy = sinon.spy(async (_protocol, _options) =>
createMockStream({
id: "2",
protocol: MULTICODEC,
writeStatus: "writable"
})
);

con1.newStream = newStreamSpy;
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];

const [stream1, stream2] = await Promise.all([
streamManager.getStream(mockPeer),
streamManager.getStream(mockPeer)
]);

const expected = ["1", "2"].toString();
const actual = [stream1.id, stream2.id].sort().toString();

expect(actual).to.be.eq(expected);
});

it("peer:update - should do nothing if another protocol hit", async () => {
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
Expand Down Expand Up @@ -156,6 +182,7 @@ function createMockStream(options: MockStreamOptions): Stream {
return {
id: options.id,
protocol: options.protocol,
writeStatus: options.writeStatus || "ready"
writeStatus: options.writeStatus || "ready",
metadata: {}
} as Stream;
}
27 changes: 23 additions & 4 deletions packages/core/src/lib/stream_manager/stream_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { Logger } from "@waku/utils";

import { selectOpenConnection } from "./utils.js";

const STREAM_LOCK_KEY = "consumed";

export class StreamManager {
private readonly log: Logger;

Expand All @@ -29,16 +31,20 @@ export class StreamManager {
await scheduledStream;
}

const stream = this.getOpenStreamForCodec(peer.id);
let stream = this.getOpenStreamForCodec(peer.id);

if (stream) {
this.log.info(
`Found existing stream peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
this.lockStream(peer.id.toString(), stream);
return stream;
}

return this.createStream(peer);
stream = await this.createStream(peer);
this.lockStream(peer.id.toString(), stream);

return stream;
}

private async createStream(peer: Peer, retries = 0): Promise<Stream> {
Expand Down Expand Up @@ -142,13 +148,26 @@ export class StreamManager {
(s) => s.protocol === this.multicodec
);

if (!stream) {
return;
}

const isStreamUnusable = ["done", "closed", "closing"].includes(
stream?.writeStatus || ""
stream.writeStatus || ""
);
if (isStreamUnusable) {
if (isStreamUnusable || this.isStreamLocked(stream)) {
return;
}

return stream;
}

private lockStream(peerId: string, stream: Stream): void {
this.log.info(`Locking stream for peerId:${peerId}\tstreamId:${stream.id}`);
stream.metadata[STREAM_LOCK_KEY] = true;
}

private isStreamLocked(stream: Stream): boolean {
return !!stream.metadata[STREAM_LOCK_KEY];
}
}
2 changes: 1 addition & 1 deletion packages/interfaces/src/light_push.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js";
import type { ISender } from "./sender.js";

export type ILightPushSDK = ISender &
export type ILightPush = ISender &
weboko marked this conversation as resolved.
Show resolved Hide resolved
IBaseProtocolSDK & { protocol: IBaseProtocolCore };
2 changes: 1 addition & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export type ProtocolCreateOptions = {
* This is used by:
* - Light Push to send messages,
* - Filter to retrieve messages.
* Defaults to 3.
* Defaults to 2.
*/
numPeersToUse?: number;
/**
Expand Down
6 changes: 3 additions & 3 deletions packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { IConnectionManager } from "./connection_manager.js";
import type { IFilterSDK } from "./filter.js";
import { IHealthManager } from "./health_manager.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPushSDK } from "./light_push.js";
import type { ILightPush } from "./light_push.js";
import { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
import type { IStoreSDK } from "./store.js";
Expand All @@ -15,7 +15,7 @@ export interface Waku {
relay?: IRelay;
store?: IStoreSDK;
filter?: IFilterSDK;
lightPush?: ILightPushSDK;
lightPush?: ILightPush;

connectionManager: IConnectionManager;

Expand All @@ -36,7 +36,7 @@ export interface LightNode extends Waku {
relay: undefined;
store: IStoreSDK;
filter: IFilterSDK;
lightPush: ILightPushSDK;
lightPush: ILightPush;
}

export interface RelayNode extends Waku {
Expand Down
3 changes: 2 additions & 1 deletion packages/sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.20",
"@waku/message-hash": "0.1.16",
"libp2p": "^1.8.1"
"libp2p": "^1.8.1",
"p-event": "^6.0.1"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.7",
Expand Down
6 changes: 4 additions & 2 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export { waitForRemotePeer, createEncoder, createDecoder } from "@waku/core";
export { createEncoder, createDecoder } from "@waku/core";
export {
DecodedMessage,
Decoder,
Expand All @@ -14,10 +14,12 @@ export {
defaultLibp2p,
createLibp2pAndUpdateOptions
} from "./create/index.js";
export { wakuLightPush } from "./protocols/lightpush/index.js";
export { wakuLightPush } from "./protocols/light_push/index.js";
export { wakuFilter } from "./protocols/filter/index.js";
export { wakuStore } from "./protocols/store/index.js";

export { waitForRemotePeer } from "./wait_for_remote_peer.js";

export * as waku from "@waku/core";
export * as utils from "@waku/utils";
export * from "@waku/interfaces";
4 changes: 2 additions & 2 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ interface Options {
}

const RENEW_TIME_LOCK_DURATION = 30 * 1000;
const DEFAULT_NUM_PEERS_TO_USE = 2;
export const DEFAULT_NUM_PEERS_TO_USE = 2;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;

export class BaseProtocolSDK implements IBaseProtocolSDK {
private healthManager: IHealthManager;
protected healthManager: IHealthManager;
public readonly numPeersToUse: number;
private peers: Peer[] = [];
private maintainPeersIntervalId: ReturnType<
Expand Down
1 change: 1 addition & 0 deletions packages/sdk/src/protocols/light_push/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { wakuLightPush } from "./light_push.js";
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, LightPushCore } from "@waku/core";
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager, LightPushCodec, LightPushCore } from "@waku/core";
import {
Failure,
type IEncoder,
ILightPushSDK,
ILightPush,
type IMessage,
type Libp2p,
type ProtocolCreateOptions,
Expand All @@ -19,14 +19,14 @@ import { BaseProtocolSDK } from "../base_protocol.js";

const log = new Logger("sdk:light-push");

class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
class LightPush extends BaseProtocolSDK implements ILightPush {
public readonly protocol: LightPushCore;

private readonly reliabilityMonitor: SenderReliabilityMonitor;

public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(
Expand All @@ -49,11 +49,6 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
message: IMessage,
_options?: ProtocolUseOptions
): Promise<SDKProtocolResult> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

options are not being accounted for in send anymore

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly! as I mentioned here it will be follow up

#2137 (comment)

const options = {
autoRetry: true,
..._options
} as ProtocolUseOptions;

const successes: PeerId[] = [];
const failures: Failure[] = [];

Expand All @@ -63,17 +58,17 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
} catch (error) {
log.error("Failed to send waku light push: pubsub topic not configured");
return {
successes,
failures: [
{
error: ProtocolError.TOPIC_NOT_CONFIGURED
}
],
successes: []
]
};
}

const hasPeers = await this.hasPeers(options);
if (!hasPeers) {
const peers = await this.getConnectedPeers();
if (peers.length === 0) {
return {
successes,
failures: [
Expand All @@ -84,53 +79,75 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
};
}

const sendPromises = this.connectedPeers.map((peer) =>
this.protocol.send(encoder, message, peer)
const results = await Promise.allSettled(
peers.map((peer) => this.protocol.send(encoder, message, peer))
);

const results = await Promise.allSettled(sendPromises);

for (const result of results) {
if (result.status === "fulfilled") {
const { failure, success } = result.value;
if (success) {
successes.push(success);
}
if (failure) {
failures.push(failure);
if (failure.peerId) {
const peer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);
if (peer) {
log.info(`
Failed to send message to peer ${failure.peerId}.
Retrying the message with the same peer in the background.
If this fails, the peer will be renewed.
`);
void this.reliabilityMonitor.attemptRetriesOrRenew(
failure.peerId,
() => this.protocol.send(encoder, message, peer)
);
}
}
}
} else {
if (result.status !== "fulfilled") {
log.error("Failed unexpectedly while sending:", result.reason);
failures.push({ error: ProtocolError.GENERIC_FAIL });
continue;
}

const { failure, success } = result.value;

if (success) {
successes.push(success);
continue;
}

if (failure) {
failures.push(failure);

const connectedPeer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);

if (connectedPeer) {
void this.reliabilityMonitor.attemptRetriesOrRenew(
connectedPeer.id,
() => this.protocol.send(encoder, message, connectedPeer)
);
Comment on lines +108 to +110
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we test this?
So seems like we are not relying on PeerManager anymore to retrieve peers to be used for the protocols: moved from hasPeers() which relies on PeerManager -> getConnectedPeers() which gets all available connections, I'm curious how renewing peers would affect management. Wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is something that was there so I assume we should have tests there

for connections - yes, we move away but not from hasPeers but form BaseProtocolSDK.connectedPeers that proved to be out of sync quite often

the reason for it is:

  • to simplify process for LightPush
  • alight with status-go usage of LightPush that proved to be reliable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe hasPeers would work with tests. Considering if it would be required to double check with getConnectedPeers, especially as we do renewals and what not

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseProtocolSDK.connectedPeers() was being inconsistent because of race conditions in shared peer management, which isn't the case with #2137

}
}
}

this.healthManager.updateProtocolHealth(LightPushCodec, successes.length);

return {
successes,
failures
};
}

private async getConnectedPeers(): Promise<Peer[]> {
const peerIDs = this.libp2p.getPeers();

if (peerIDs.length === 0) {
return [];
}

const peers = await Promise.all(
peerIDs.map(async (id) => {
try {
return await this.libp2p.peerStore.get(id);
} catch (e) {
return null;
}
})
);

return peers
.filter((p) => !!p)
.filter((p) => (p as Peer).protocols.includes(LightPushCodec))
.slice(0, this.numPeersToUse) as Peer[];
}
}

export function wakuLightPush(
connectionManager: ConnectionManager,
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => ILightPushSDK {
return (libp2p: Libp2p) => new LightPushSDK(connectionManager, libp2p, init);
): (libp2p: Libp2p) => ILightPush {
return (libp2p: Libp2p) => new LightPush(connectionManager, libp2p, init);
}
Loading
Loading