Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve peer manager and re-integrate to light push #2191

Merged
merged 27 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c93b58e
up lock
weboko Oct 19, 2024
7efbd26
make ConnectionManager use ctor
weboko Oct 19, 2024
a6dd499
reform connection manager configurations
weboko Oct 19, 2024
e897d5c
remove log param from peerManager
weboko Oct 20, 2024
e1813bc
make PeerManager use only ConnectionManager, move getPeers to Connect…
weboko Oct 20, 2024
9bdc2af
remove allPeers and connectedPeers from BaseProtocolCore, update test…
weboko Oct 20, 2024
2edd856
use only one peerManager from Waku object
weboko Oct 22, 2024
7ae3b91
remove IBaseProtocolSDK and merge with PeerManager
weboko Oct 22, 2024
d0c6905
re-implement peerManager, remove ProtocolUseOptions
weboko Oct 23, 2024
1eaedb3
merge with master
weboko Oct 23, 2024
984326b
remove not needed test, up lock
weboko Oct 23, 2024
8a3337d
update deps and lock
weboko Oct 24, 2024
d4210c7
remove old test for peerManager, fix check and spell
weboko Oct 24, 2024
7258a01
merge with master
weboko Jan 24, 2025
7f4033d
merge with master
weboko Jan 28, 2025
994d05e
rename to getConnectedPeers
weboko Jan 28, 2025
1be6e2d
feat: improve filter subscriptions (#2193)
weboko Jan 29, 2025
ebb00f4
up lock
weboko Jan 29, 2025
b6ac054
Merge branch 'master' of github.com:waku-org/js-waku into weboko/peer…
weboko Jan 29, 2025
b1f4adf
merge with master
weboko Jan 30, 2025
93eebfa
make peer retrieval probabilistic
weboko Jan 30, 2025
070b5a4
add comments
weboko Jan 30, 2025
38067b3
up lightpush tests
weboko Jan 30, 2025
fb9556a
add tests for peer_manager, improve folder structure
weboko Jan 30, 2025
e84d0db
create named files for protocols
weboko Jan 30, 2025
8aac817
create named files, simplify project structure
weboko Jan 30, 2025
da1ffe6
remove only
weboko Jan 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
"upgrader",
"vacp",
"varint",
"weboko",
"waku",
"wakuconnect",
"wakunode",
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
browser:
runs-on: ubuntu-latest
container:
image: mcr.microsoft.com/playwright:v1.48.0-jammy
image: mcr.microsoft.com/playwright:v1.50.0-jammy
env:
HOME: "/root"
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/playwright.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
timeout-minutes: 60
runs-on: ubuntu-latest
container:
image: mcr.microsoft.com/playwright:v1.48.0-jammy
image: mcr.microsoft.com/playwright:v1.50.0-jammy
steps:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
Expand Down
6,225 changes: 3,408 additions & 2,817 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/browser-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"test": "npx playwright test"
},
"devDependencies": {
"@playwright/test": "^1.48.1",
"@playwright/test": "^1.50.0",
"@waku/create-app": "^0.1.1-504bcd4",
"dotenv-flow": "^4.1.0",
"serve": "^14.2.3"
Expand Down
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
},
"devDependencies": {
"@libp2p/peer-id": "^5.0.1",
"@libp2p/interface": "^2.1.3",
"@multiformats/multiaddr": "^12.0.0",
"@rollup/plugin-commonjs": "^25.0.7",
"@rollup/plugin-json": "^6.0.0",
Expand Down
3 changes: 1 addition & 2 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";
export * as waku_store from "./lib/store/index.js";
export { StoreCore, StoreCodec } from "./lib/store/index.js";

export { ConnectionManager } from "./lib/connection_manager.js";
export { ConnectionManager } from "./lib/connection_manager/index.js";

export { getHealthManager } from "./lib/health_manager.js";

export { KeepAliveManager } from "./lib/keep_alive_manager.js";
weboko marked this conversation as resolved.
Show resolved Hide resolved
export { StreamManager } from "./lib/stream_manager/index.js";

export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js";
73 changes: 0 additions & 73 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ import type {
Libp2pComponents,
PubsubTopic
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p";

import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager/index.js";

/**
Expand All @@ -23,7 +20,6 @@ export class BaseProtocol implements IBaseProtocolCore {
protected constructor(
public multicodec: string,
protected components: Libp2pComponents,
private log: Logger,
public readonly pubsubTopics: PubsubTopic[]
) {
this.addLibp2pEventListener = components.events.addEventListener.bind(
Expand All @@ -45,73 +41,4 @@ export class BaseProtocol implements IBaseProtocolCore {
protected async getStream(peer: Peer): Promise<Stream> {
return this.streamManager.getStream(peer);
}

/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
public async allPeers(): Promise<Peer[]> {
return getPeersForProtocol(this.components.peerStore, [this.multicodec]);
}

public async connectedPeers(): Promise<Peer[]> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still have access to this utility function from elsewhere?
Basically getting the connected peers for a particular protocol

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in ConnectionManager there is method implemented for it
public async getConnectedPeers(codec?: string): Promise<Peer[]>

Copy link
Collaborator

@danisharora099 danisharora099 Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

easier to do
waku.lightpush.connectedPeers()
vs
waku.connectionManager.getPeers(waku.lightpush.multicodec)

im in favor of not removing this
also is a breaking change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why waku.lightpush.connectedPeers() even needed?
I would assume to be used on the consumer side - but then it's something that we don't expect to be used

whereas waku.connectionManager.getPeers(waku.lightpush.multicodec) is for internal usage for protocols and shouldn't be used outside

also is a breaking change

agree, this PR is pretty much breaking change as we found out previous things were not working well enough

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have had feedback in the past that convinces us that consumers want to be able to check the information of connected peers on the protocol (cc @vpavlin)
we can add it on the SDK layer instead of the core layer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can continue to use ConnectionManager to provide this API - do you see reason in switching out the use of multicodecs as arg for the function, with the enum type for protocols instead to help DX? (ref: #2191)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have had feedback in the past that convinces us that consumers want to be able to check the information

that should be tackled by health state of the node, otherwise I don't see a need for consumers for do waku.lightPush.connectedPeers() - if that is something they need to do we can tell them:

  • do waku.getConnectedPeers().filter(byMulticodec);
  • expose connectionManager so that they can do the same from it;
    Back then this functionality was added without clear need.

do you see reason in switching out the use of multicodecs as arg for the function, with the enum type for protocols instead to help DX?

as I explained it in the other comment - connectionManager.getPeers(multicodec) - will return peers for metadata and other protocols where as Protocols is for consumers use and has only LightPush/Filter etc.


Just to clarify here - let's not have unnecessary entry points and not needed code. This will be easier for us as to support, bug fix and improve our own code base.
I see us adding one method that does the job and then improving it if we find more evidence.
Of course I might be wrong as to this particular case, but then we can add it once requested.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danisharora099 let me know what you think.

const peers = await this.allPeers();
return peers.filter((peer) => {
const connections = this.components.connectionManager.getConnections(
peer.id
);
return connections.length > 0;
});
}

/**
* Retrieves a list of connected peers that support the protocol. The list is sorted by latency.
*
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap.
*/
public async getPeers(
{
numPeers,
maxBootstrapPeers
}: {
numPeers: number;
maxBootstrapPeers: number;
} = {
maxBootstrapPeers: 0,
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all connected peers that support the protocol & shard (if configured)
const allAvailableConnectedPeers = await this.connectedPeers();

// Filter the peers based on discovery & number of peers requested
const filteredPeers = filterPeersByDiscovery(
allAvailableConnectedPeers,
numPeers,
maxBootstrapPeers
);

// Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency(
this.components.peerStore,
filteredPeers
);

if (sortedFilteredPeers.length === 0) {
this.log.warn(
"No peers found. Ensure you have a connection to the network."
);
}

if (sortedFilteredPeers.length < numPeers) {
this.log.warn(
`Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.`
);
}

return sortedFilteredPeers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
IConnectionStateEvents,
IPeersByDiscoveryEvents,
IRelay,
KeepAliveOptions,
PeersByDiscoveryResult,
PubsubTopic,
ShardInfo
Expand All @@ -29,23 +28,36 @@
import { Logger } from "@waku/utils";

import { KeepAliveManager } from "./keep_alive_manager.js";
import { getPeerPing } from "./utils.js";

const log = new Logger("connection-manager");

export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
export const DEFAULT_MAX_PARALLEL_DIALS = 3;
const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
const DEFAULT_MAX_PARALLEL_DIALS = 3;

const DEFAULT_PING_KEEP_ALIVE_SEC = 5 * 60;
const DEFAULT_RELAY_KEEP_ALIVE_SEC = 5 * 60;

type ConnectionManagerConstructorOptions = {
libp2p: Libp2p;
pubsubTopics: PubsubTopic[];
relay?: IRelay;
config?: Partial<ConnectionManagerOptions>;
};

export class ConnectionManager
extends TypedEventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents>
implements IConnectionManager
{
private static instances = new Map<string, ConnectionManager>();
// TODO(weboko): make it private
public readonly pubsubTopics: PubsubTopic[];

private keepAliveManager: KeepAliveManager;
private options: ConnectionManagerOptions;
private libp2p: Libp2p;
private dialAttemptsForPeer: Map<string, number> = new Map();
private dialErrorsForPeer: Map<string, any> = new Map();

Check warning on line 60 in packages/core/src/lib/connection_manager/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 60 in packages/core/src/lib/connection_manager/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

private currentActiveParallelDialCount = 0;
private pendingPeerDialQueue: Array<PeerId> = [];
Expand All @@ -60,29 +72,6 @@
return this.isP2PNetworkConnected;
}

public static create(
peerId: string,
libp2p: Libp2p,
keepAliveOptions: KeepAliveOptions,
pubsubTopics: PubsubTopic[],
relay?: IRelay,
options?: ConnectionManagerOptions
): ConnectionManager {
let instance = ConnectionManager.instances.get(peerId);
if (!instance) {
instance = new ConnectionManager(
libp2p,
keepAliveOptions,
pubsubTopics,
relay,
options
);
ConnectionManager.instances.set(peerId, instance);
}

return instance;
}

public stop(): void {
this.keepAliveManager.stopAll();
this.libp2p.removeEventListener(
Expand Down Expand Up @@ -165,27 +154,26 @@
};
}

private constructor(
libp2p: Libp2p,
keepAliveOptions: KeepAliveOptions,
public readonly configuredPubsubTopics: PubsubTopic[],
relay?: IRelay,
options?: Partial<ConnectionManagerOptions>
) {
public constructor(options: ConnectionManagerConstructorOptions) {
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
super();
this.libp2p = libp2p;
this.configuredPubsubTopics = configuredPubsubTopics;
this.libp2p = options.libp2p;
this.pubsubTopics = options.pubsubTopics;
this.options = {
maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER,
maxBootstrapPeersAllowed: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED,
maxParallelDials: DEFAULT_MAX_PARALLEL_DIALS,
...options
pingKeepAlive: DEFAULT_PING_KEEP_ALIVE_SEC,
relayKeepAlive: DEFAULT_RELAY_KEEP_ALIVE_SEC,
...options.config
};

this.keepAliveManager = new KeepAliveManager({
relay,
libp2p,
options: keepAliveOptions
relay: options.relay,
libp2p: options.libp2p,
options: {
pingKeepAlive: this.options.pingKeepAlive,
relayKeepAlive: this.options.relayKeepAlive
}
});

this.startEventListeners()
Expand All @@ -202,6 +190,29 @@
);
}

public async getConnectedPeers(codec?: string): Promise<Peer[]> {
const peerIDs = this.libp2p.getPeers();

if (peerIDs.length === 0) {
return [];
}

const peers = await Promise.all(
peerIDs.map(async (id) => {
try {
return await this.libp2p.peerStore.get(id);
} catch (e) {
return null;
}
})
);

return peers
.filter((p) => !!p)
.filter((p) => (codec ? (p as Peer).protocols.includes(codec) : true))
.sort((left, right) => getPeerPing(left) - getPeerPing(right)) as Peer[];
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
}

private async dialPeerStorePeers(): Promise<void> {
const peerInfos = await this.libp2p.peerStore.all();
const dialPromises = [];
Expand Down Expand Up @@ -304,7 +315,7 @@
} else {
// Handle generic error
log.error(
`Error dialing peer ${peerIdStr} - ${(error as any).message}`

Check warning on line 318 in packages/core/src/lib/connection_manager/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 318 in packages/core/src/lib/connection_manager/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
);
}
this.dialErrorsForPeer.set(peerIdStr, error);
Expand Down Expand Up @@ -572,7 +583,7 @@

log.warn(
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
this.configuredPubsubTopics
this.pubsubTopics
}).
Not dialing.`
);
Expand Down Expand Up @@ -667,7 +678,7 @@
const pubsubTopics = shardInfoToPubsubTopics(shardInfo);

const isTopicConfigured = pubsubTopics.some((topic) =>
this.configuredPubsubTopics.includes(topic)
this.pubsubTopics.includes(topic)
);
return isTopicConfigured;
}
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/lib/connection_manager/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { ConnectionManager } from "./connection_manager.js";
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import type { PeerId } from "@libp2p/interface";
import type { IRelay, Libp2p, PeerIdStr } from "@waku/interfaces";
import type { KeepAliveOptions } from "@waku/interfaces";
import { Logger, pubsubTopicToSingleShardInfo } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";

import { createEncoder } from "./message/version_0.js";
import { createEncoder } from "../message/version_0.js";

export const RelayPingContentTopic = "/relay-ping/1/ping/null";
const RelayPingContentTopic = "/relay-ping/1/ping/null";
const log = new Logger("keep-alive");

type KeepAliveOptions = {
pingKeepAlive: number;
relayKeepAlive: number;
};

type CreateKeepAliveManagerOptions = {
options: KeepAliveOptions;
libp2p: Libp2p;
Expand Down
25 changes: 25 additions & 0 deletions packages/core/src/lib/connection_manager/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type { Peer } from "@libp2p/interface";
import { bytesToUtf8 } from "@waku/utils/bytes";

/**
* Reads peer's metadata and retrieves ping value.
* @param peer Peer or null
* @returns -1 if no ping attached, otherwise returns ping value
*/
export const getPeerPing = (peer: Peer | null): number => {
if (!peer) {
return -1;
}

try {
const bytes = peer.metadata.get("ping");

if (!bytes) {
return -1;
}

return Number(bytesToUtf8(bytes));
} catch (e) {
return -1;
}
};
2 changes: 1 addition & 1 deletion packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(FilterCodecs.SUBSCRIBE, libp2p.components, log, pubsubTopics);
super(FilterCodecs.SUBSCRIBE, libp2p.components, pubsubTopics);

libp2p
.handle(FilterCodecs.PUSH, this.onRequest.bind(this), {
Expand Down
Loading
Loading