Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use the lowest latency peer for light protocols #1520

Closed
wants to merge 95 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
29cb6a8
set peer-exchange with default bootstrap
danisharora099 Aug 9, 2023
3646fdc
only initialise protocols with bootstrap peers
danisharora099 Aug 11, 2023
8d48380
Merge branch 'master' into feat/px-default
danisharora099 Aug 11, 2023
573fb8d
update package
danisharora099 Aug 11, 2023
4894091
Merge branch 'master' into feat/px-default
danisharora099 Aug 11, 2023
a5c1372
Merge branch 'master' of github.com:waku-org/js-waku into feat/px-def…
danisharora099 Aug 13, 2023
6c1ec84
update package-lock
danisharora099 Aug 13, 2023
a0f0668
refactor `getPeers` while setting up a protocol
danisharora099 Aug 15, 2023
f376334
move codecs to `@waku/interfaces`
danisharora099 Aug 15, 2023
f7f383b
lightpush: send messages to multiple peers
danisharora099 Aug 15, 2023
6807888
only use multiple peers for LP and Filter
danisharora099 Aug 15, 2023
f956f7f
fix: ts warnings
danisharora099 Aug 15, 2023
b43fe35
lightpush: tests pass
danisharora099 Aug 15, 2023
d712e8a
update breaking changes for new API
danisharora099 Aug 15, 2023
8bf969a
move codecs back into protocol files
danisharora099 Aug 17, 2023
3204267
refactor: `getPeers()`
danisharora099 Aug 17, 2023
645d7c1
rm: log as an arg
danisharora099 Aug 17, 2023
9d70d55
add tsdoc for getPeers
danisharora099 Aug 17, 2023
12cec4a
Merge branch 'master' into feat/px-default
danisharora099 Aug 17, 2023
773d1ef
add import
danisharora099 Aug 17, 2023
f345496
add prettier rule to eslint
danisharora099 Aug 17, 2023
f2c2fd0
Merge branch 'feat/fix-prettier-eslint' of github.com:waku-org/js-wak…
danisharora099 Aug 17, 2023
efef7a2
add: peer exchange to sdk as a dep
danisharora099 Aug 17, 2023
6a17cc3
fix eslint error
danisharora099 Aug 17, 2023
befbd37
add try catch
danisharora099 Aug 17, 2023
aa87190
Merge branch 'master' into feat/px-default
danisharora099 Aug 21, 2023
83d5103
revert unecessary diff
danisharora099 Aug 21, 2023
63796db
Merge branch 'feat/px-default' of github.com:waku-org/js-waku into fe…
danisharora099 Aug 21, 2023
9583bda
revert unecessary diff
danisharora099 Aug 21, 2023
6082ffb
Merge branch 'feat/px-default' of github.com:waku-org/js-waku into fe…
danisharora099 Aug 21, 2023
c63d146
fix imports
danisharora099 Aug 21, 2023
6fdcf61
convert relaycodecs to array
danisharora099 Aug 22, 2023
916af18
Merge branch 'master' into feat/px-default
danisharora099 Aug 22, 2023
5dff864
remove: peerId as an arg for protocol methods
danisharora099 Aug 29, 2023
f75b2d9
keep peerId as an arg for peer-exchange
danisharora099 Aug 29, 2023
7d14956
remove: peerId from getPeers()
danisharora099 Aug 29, 2023
1c93c12
lightpush: extract hardcoded numPeers as a constant
danisharora099 Aug 29, 2023
016965b
Merge branch 'master' of github.com:waku-org/js-waku into feat/px-def…
danisharora099 Aug 29, 2023
04b9bb5
return all peers if numPeers is 0 and increase readability for random…
danisharora099 Aug 29, 2023
1f81ae1
refactor considering more than 1 bootstrap peers can exist
danisharora099 Aug 29, 2023
dce7b3b
use `getPeers`
danisharora099 Aug 29, 2023
a376bdd
change arg for `getPeers` to object
danisharora099 Aug 29, 2023
ec8a1a1
store pings for peers & create getter
danisharora099 Aug 31, 2023
ec3bcc8
address comments
danisharora099 Aug 31, 2023
c697c15
Merge branch 'master' of github.com:waku-org/js-waku into feat/px-def…
danisharora099 Aug 31, 2023
1b0e1fe
refactor tests for new API
danisharora099 Aug 31, 2023
c7fca3e
lightpush: make constant the class variable
danisharora099 Aug 31, 2023
60c6e05
Merge branch 'feat/px-default' of github.com:waku-org/js-waku into fe…
danisharora099 Aug 31, 2023
57fbfd2
make `KeepAliveManager` a singleton
danisharora099 Sep 1, 2023
3a50420
use the peer with the lowest latency for protocols
danisharora099 Sep 1, 2023
40e0b94
introduce tests for `selectPeerForProtocol`
danisharora099 Sep 1, 2023
97dff59
add more tests for util & install `chai-as-promised`
danisharora099 Sep 1, 2023
acf078d
update package-lock
danisharora099 Sep 1, 2023
e98e3c6
rm: only for utils test
danisharora099 Sep 1, 2023
191355b
use `maxBootstrapPeers` instead of `includeBootstrap`
danisharora099 Sep 1, 2023
6820ebe
refactor protocols for new API
danisharora099 Sep 1, 2023
e968d4d
add tests for `getPeers`
danisharora099 Sep 1, 2023
401265b
skip getPeers test
danisharora099 Sep 2, 2023
1b91278
rm: only from test
danisharora099 Sep 2, 2023
c421732
fix: breaking circular dep error
danisharora099 Sep 2, 2023
1a8d3a5
Merge branch 'feat/px-default' of github.com:waku-org/js-waku into fe…
danisharora099 Sep 2, 2023
75ee463
Merge branch 'master' of github.com:waku-org/js-waku into feat/px-def…
danisharora099 Sep 4, 2023
56d8b84
move tests to `base_protocol.spec.ts`
danisharora099 Sep 5, 2023
dcb4fbf
break down `getPeers` into a `filter` method
danisharora099 Sep 5, 2023
2cd8fa0
return all bootstrap peers if arg is 0
danisharora099 Sep 5, 2023
ad518af
refactor test without stubbing
danisharora099 Sep 5, 2023
d29ecab
Merge branch 'master' of github.com:waku-org/js-waku into feat/select…
danisharora099 Sep 5, 2023
9097564
Merge branch 'feat/px-default' of github.com:waku-org/js-waku into fe…
danisharora099 Sep 5, 2023
6847766
readd chai-as-promised
danisharora099 Sep 5, 2023
f24e8c0
address comments
danisharora099 Sep 5, 2023
7354b09
update test title
danisharora099 Sep 5, 2023
c1d2e1a
move `filterPeers` to a separate file
danisharora099 Sep 5, 2023
919f10b
address comments & add more test
danisharora099 Sep 5, 2023
e5f1331
Merge branch 'master' into feat/px-default
danisharora099 Sep 5, 2023
6a1a4ad
make test title more verbose
danisharora099 Sep 5, 2023
a6cad7a
Merge branch 'feat/px-default' of github.com:waku-org/js-waku into fe…
danisharora099 Sep 5, 2023
0f629cb
address comments
danisharora099 Sep 5, 2023
9302a86
remove ProtocolOptions
danisharora099 Sep 5, 2023
50df227
Merge branch 'master' into feat/px-default
danisharora099 Sep 5, 2023
053c9da
chore: refactor tests for new API
danisharora099 Sep 5, 2023
5aa4fcb
add defaults for getPeers
danisharora099 Sep 5, 2023
afbcaae
address comments
danisharora099 Sep 5, 2023
78114c7
Merge branch 'master' of github.com:waku-org/js-waku into feat/px-def…
danisharora099 Sep 5, 2023
ee659cd
Merge branch 'feat/px-default' of github.com:waku-org/js-waku into fe…
danisharora099 Sep 5, 2023
1002963
minor improvements
danisharora099 Sep 5, 2023
2dabefc
peer pings: pass the hashmap in args instead of the function
danisharora099 Sep 5, 2023
d28647a
update tests for the new API
danisharora099 Sep 5, 2023
d44670d
rm unneeded comment
danisharora099 Sep 5, 2023
2e907ea
Merge branch 'feat/px-default' of github.com:waku-org/js-waku into fe…
danisharora099 Sep 5, 2023
9439bb3
address comment: add diversity of node tags to test
danisharora099 Sep 7, 2023
1e2a9d2
address comments
danisharora099 Sep 7, 2023
7e49625
Merge branch 'master' of github.com:waku-org/js-waku into feat/px-def…
danisharora099 Sep 7, 2023
c90f88b
fix: imports
danisharora099 Sep 7, 2023
518f9c6
Merge branch 'feat/px-default' of github.com:waku-org/js-waku into fe…
danisharora099 Sep 7, 2023
7840a78
merge
danisharora099 Sep 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces";
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";

import { filterPeers } from "./filterPeers.js";
import { KeepAliveManager } from "./keep_alive_manager.js";
import { StreamManager } from "./stream_manager.js";

/**
Expand Down Expand Up @@ -54,8 +55,10 @@ export class BaseProtocol implements IBaseProtocol {
}

protected async getPeer(peerId?: PeerId): Promise<Peer> {
const { peerPings } = KeepAliveManager.getInstance();
const { peer } = await selectPeerForProtocol(
this.peerStore,
peerPings,
[this.multicodec],
peerId
);
Expand Down
8 changes: 6 additions & 2 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ export class ConnectionManager
...options
};

this.keepAliveManager = new KeepAliveManager(keepAliveOptions, relay);
this.keepAliveManager = KeepAliveManager.createInstance(
this.libp2p.services.ping,
keepAliveOptions,
relay
);

this.run()
.then(() => log(`Connection Manager is now running`))
Expand Down Expand Up @@ -340,7 +344,7 @@ export class ConnectionManager
void (async () => {
const peerId = evt.detail;

this.keepAliveManager.start(peerId, this.libp2p.services.ping);
this.keepAliveManager.start(peerId);

const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
Tags.BOOTSTRAP
Expand Down
52 changes: 46 additions & 6 deletions packages/core/src/lib/keep_alive_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,59 @@ import type { KeepAliveOptions } from "@waku/interfaces";
import debug from "debug";
import type { PingService } from "libp2p/ping";

import { createEncoder } from "../index.js";
import { createEncoder } from "./message/version_0.js";
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while unrelated to this PR, this was causing a cyclic dependency error that surfaced during this PR


export const RelayPingContentTopic = "/relay-ping/1/ping/null";
const log = debug("waku:keep-alive");

export class KeepAliveManager {
private static instance: KeepAliveManager;
Copy link
Collaborator Author

@danisharora099 danisharora099 Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

converted this into a singleton for easier access (also consistent with ConnectionManager)


private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>>;
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>>;
private options: KeepAliveOptions;
private relay?: IRelay;
private libp2pPing: PingService;
public peerPings: Map<string, number>;

constructor(options: KeepAliveOptions, relay?: IRelay) {
private constructor(
libp2pPing: PingService,
options: KeepAliveOptions,
relay?: IRelay
) {
this.pingKeepAliveTimers = new Map();
this.relayKeepAliveTimers = new Map();
this.options = options;
this.relay = relay;
this.peerPings = new Map();
this.libp2pPing = libp2pPing;
}

public static createInstance(
libp2pPing: PingService,
options: KeepAliveOptions,
relay?: IRelay
): KeepAliveManager {
if (!KeepAliveManager.instance) {
KeepAliveManager.instance = new KeepAliveManager(
libp2pPing,
options,
relay
);
}
return KeepAliveManager.instance;
}

public static getInstance(): KeepAliveManager {
if (!KeepAliveManager.instance) {
throw new Error(
"KeepAliveManager not initialized - please use createInstance() first"
);
}
return KeepAliveManager.instance;
}

public start(peerId: PeerId, libp2pPing: PingService): void {
public start(peerId: PeerId): void {
// Just in case a timer already exist for this peer
this.stop(peerId);

Expand All @@ -33,9 +67,15 @@ export class KeepAliveManager {

if (pingPeriodSecs !== 0) {
const interval = setInterval(() => {
libp2pPing.ping(peerId).catch((e) => {
log(`Ping failed (${peerIdStr})`, e);
});
this.libp2pPing
.ping(peerId)
.then((ping) => {
log(`Ping succeeded (${peerIdStr})`, ping);
this.peerPings.set(peerIdStr, ping);
})
.catch((e) => {
log(`Ping failed (${peerIdStr})`, e);
});
}, pingPeriodSecs * 1000);
this.pingKeepAliveTimers.set(peerIdStr, interval);
}
Expand Down
8 changes: 5 additions & 3 deletions packages/tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"@waku/interfaces": "*",
"@waku/utils": "*",
"app-root-path": "^3.1.0",
"chai-as-promised": "^7.1.1",
"debug": "^4.3.4",
"dockerode": "^3.3.5",
"p-timeout": "^6.1.0",
Expand All @@ -66,20 +67,21 @@
},
"devDependencies": {
"@libp2p/bootstrap": "^9.0.2",
"@types/sinon": "^10.0.16",
"@types/chai": "^4.3.5",
"@types/dockerode": "^3.3.19",
"@types/mocha": "^10.0.1",
"@types/sinon": "^10.0.16",
"@types/tail": "^2.2.1",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^6.6.0",
"@waku/sdk": "*",
"@waku/dns-discovery": "*",
"@waku/message-encryption": "*",
"@waku/peer-exchange": "*",
"@waku/sdk": "*",
"aegir": "^40.0.11",
"chai": "^4.3.7",
"datastore-core": "^9.2.2",
"cspell": "^7.3.2",
"datastore-core": "^9.2.2",
"debug": "^4.3.4",
"interface-datastore": "^8.2.3",
"libp2p": "^0.46.8",
Expand Down
134 changes: 132 additions & 2 deletions packages/tests/tests/utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import type { PeerStore } from "@libp2p/interface/peer-store";
import type { Peer } from "@libp2p/interface/peer-store";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import {
createDecoder,
createEncoder,
Expand All @@ -9,11 +12,16 @@ import { Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { toAsyncIterator } from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { selectPeerForProtocol } from "@waku/utils/libp2p";
import chai, { expect } from "chai";
import chaiAsPromised from "chai-as-promised";
import sinon from "sinon";

import { makeLogFileName, NOISE_KEY_1 } from "../src/index.js";
import { delay, makeLogFileName, NOISE_KEY_1 } from "../src/index.js";
import { NimGoNode } from "../src/node/node.js";

chai.use(chaiAsPromised);

const TestContentTopic = "/test/1/waku-filter";
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
const TestDecoder = createDecoder(TestContentTopic);
Expand Down Expand Up @@ -106,3 +114,125 @@ describe("Util: toAsyncIterator: Filter", () => {
expect(result.done).to.eq(true);
});
});

const TestCodec = "test/1";

describe("selectPeerForProtocol", () => {
let peerStore: PeerStore;
let peerPings: Map<string, number>;
const protocols = [TestCodec];

beforeEach(async function () {
this.timeout(10000);
const waku = await createLightNode();
await waku.start();
await delay(3000);
peerStore = waku.libp2p.peerStore;
peerPings = new Map();
});

afterEach(() => {
sinon.restore();
});

it("should return the peer with the lowest ping", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();

const mockPeers = [
{ id: peer1, protocols: [TestCodec] },
{ id: peer2, protocols: [TestCodec] },
{ id: peer3, protocols: [TestCodec] }
] as Peer[];

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

peerPings.set(peer1.toString(), 500);
peerPings.set(peer2.toString(), 1000);
peerPings.set(peer3.toString(), 100);

const result = await selectPeerForProtocol(peerStore, peerPings, protocols);

expect(result.peer).to.deep.equal(mockPeers[2]);
expect(result.protocol).to.equal(TestCodec);
});

it("should return the peer with the provided peerId", async function () {
const targetPeer = await createSecp256k1PeerId();
const mockPeer = { id: targetPeer, protocols: [TestCodec] } as Peer;
sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer);

const result = await selectPeerForProtocol(
peerStore,
peerPings,
protocols,
targetPeer
);
expect(result.peer).to.deep.equal(mockPeer);
});

it("should return a random peer when all peers have the same latency", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();

const mockPeers = [
{ id: peer1, protocols: [TestCodec] },
{ id: peer2, protocols: [TestCodec] },
{ id: peer3, protocols: [TestCodec] }
] as Peer[];

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

peerPings.set(peer1.toString(), 500);
peerPings.set(peer2.toString(), 500);
peerPings.set(peer3.toString(), 500);

const result = await selectPeerForProtocol(peerStore, peerPings, protocols);

expect(mockPeers).to.deep.include(result.peer);
});

it("should throw an error when no peer matches the given protocols", async function () {
const mockPeers = [
{ id: await createSecp256k1PeerId(), protocols: ["DifferentCodec"] },
{
id: await createSecp256k1PeerId(),
protocols: ["AnotherDifferentCodec"]
}
] as Peer[];

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

await expect(
selectPeerForProtocol(peerStore, peerPings, protocols)
).to.be.rejectedWith(
`Failed to find known peer that registers protocols: ${protocols}`
);
});

it("should throw an error when the selected peer does not register the required protocols", async function () {
const targetPeer = await createSecp256k1PeerId();
const mockPeer = { id: targetPeer, protocols: ["DifferentCodec"] } as Peer;
sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer);

await expect(
selectPeerForProtocol(peerStore, peerPings, protocols, targetPeer)
).to.be.rejectedWith(
`Peer does not register required protocols (${targetPeer.toString()}): ${protocols}`
);
});
});
Loading