Skip to content

Commit

Permalink
fix: allow autosharding nodes to get peers
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Jan 17, 2024
1 parent 62725c1 commit 7a18dc3
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 24 deletions.
27 changes: 19 additions & 8 deletions packages/tests/tests/getPeers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo),
lightpush: true
lightpush: true,
relay: true
});

const serviceNodeMa = await serviceNode1.getMultiaddrWithId();
Expand All @@ -77,12 +78,18 @@ describe("getConnectedPeersForProtocolAndShard", function () {
shards: [1]
};

const shardInfoServiceNode: ShardInfo = {
clusterId: 1,
shards: [2]
};

await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo),
lightpush: true
clusterId: shardInfoServiceNode.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfoServiceNode),
lightpush: true,
relay: true
});

const serviceNodeMa = await serviceNode1.getMultiaddrWithId();
Expand Down Expand Up @@ -120,7 +127,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true
lightpush: true,
relay: true
});

// and another node in the same cluster cluster as our node
Expand All @@ -129,7 +137,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true
lightpush: true,
relay: true
});

const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
Expand Down Expand Up @@ -170,7 +179,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true
lightpush: true,
relay: true
});

// and another node in the same cluster cluster as our node
Expand All @@ -180,7 +190,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true
lightpush: true,
relay: true
});

const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId();
Expand Down
16 changes: 6 additions & 10 deletions packages/tests/tests/light-push/multiple_pubsub.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
filter: true,
lightpush: true,
relay: true,
pubsubTopic: [autoshardingPubsubTopic2]
pubsubTopic: [autoshardingPubsubTopic2],
clusterId: shardInfo.clusterId
});
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
await waku.dial(await nwaku2.getMultiaddrWithId());
Expand Down Expand Up @@ -353,10 +354,6 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
customContentTopic2,
clusterId
);
const contentTopicInfo: ContentTopicInfo = {
clusterId,
contentTopics: [customContentTopic1, customContentTopic2]
};
const customEncoder1 = createEncoder({
contentTopic: customContentTopic1,
pubsubTopicShardInfo: {
Expand All @@ -372,11 +369,10 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()

this.beforeEach(async function () {
this.timeout(15000);
[nwaku, waku] = await runNodes(
this,
[autoshardingPubsubTopic1, autoshardingPubsubTopic2],
contentTopicInfo
);
[nwaku, waku] = await runNodes(this, [
autoshardingPubsubTopic1,
autoshardingPubsubTopic2
]);
messageCollector = new MessageCollector(nwaku);
nimPeerId = await nwaku.getPeerId();
});
Expand Down
6 changes: 4 additions & 2 deletions packages/tests/tests/store/multiple_pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
await nwaku.start({
store: true,
pubsubTopic: [autoshardingPubsubTopic1, autoshardingPubsubTopic2],
relay: true
relay: true,
clusterId
});
await nwaku.ensureSubscriptionsAutosharding([
customContentTopic1,
Expand Down Expand Up @@ -287,7 +288,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
await nwaku2.start({
store: true,
pubsubTopic: [autoshardingPubsubTopic2],
relay: true
relay: true,
clusterId
});
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);

Expand Down
21 changes: 17 additions & 4 deletions packages/utils/src/libp2p/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { ShardingParams } from "@waku/interfaces";

import { bytesToUtf8 } from "../bytes/index.js";
import { decodeRelayShard } from "../common/relay_shard_codec.js";
import { contentTopicToShardIndex } from "../index.js";

/**
* Returns a pseudo-random peer that supports the given protocol.
Expand Down Expand Up @@ -89,17 +90,29 @@ export async function getConnectedPeersForProtocolAndShard(

if (supportsProtocol) {
if (shardInfo) {
//TODO: support auto-sharding
let shards;
if (!("shards" in shardInfo)) {
throw new Error(
`Connections Manager only supports static sharding for now. Autosharding is not supported.`
if (!("contentTopics" in shardInfo)) {
throw new Error(
"Missing configuration for static or auto sharding"
);
}
shards = shardInfo.contentTopics.map((topic) =>
contentTopicToShardIndex(topic)
);
} else {
shards = shardInfo.shards;
}
const encodedPeerShardInfo = peer.metadata.get("shardInfo");
const peerShardInfo =
encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo);

if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) {
if (
peerShardInfo &&
shardInfo.clusterId === peerShardInfo.clusterId &&
(!("contentTopics" in shardInfo) ||
shards.some((shard) => peerShardInfo.shards.includes(shard)))
) {
return peer;
}
} else {
Expand Down

0 comments on commit 7a18dc3

Please sign in to comment.