Skip to content

Commit

Permalink
reintroduce and deprecate named sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Dec 8, 2023
1 parent 5a4774a commit 7fc587f
Show file tree
Hide file tree
Showing 17 changed files with 195 additions and 32 deletions.
14 changes: 9 additions & 5 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ class Filter extends BaseProtocol implements IReceiver {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);

this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics =
options?.pubsubTopics ?? this.initializePubsubTopic(options?.shardInfo);

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
Expand All @@ -291,11 +292,14 @@ class Filter extends BaseProtocol implements IReceiver {
}

async createSubscription(
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo?: SingleShardInfo | string
): Promise<Subscription> {
const pubsubTopic = pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic;
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic;

ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);

Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class LightPush extends BaseProtocol implements ILightPush {

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics =
options?.pubsubTopics ?? this.initializePubsubTopic(options?.shardInfo);
}

private async preparePushMessage(
Expand Down
13 changes: 10 additions & 3 deletions packages/core/src/lib/message/version_0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,20 @@ export class Encoder implements IEncoder {
* messages.
*/
export function createEncoder(
{ pubsubTopicShardInfo, contentTopic, ephemeral, metaSetter }: EncoderOptions,
{
pubsubTopic,
pubsubTopicShardInfo,
contentTopic,
ephemeral,
metaSetter
}: EncoderOptions,
autosharding = false,
clusterId = 0
): Encoder {
return new Encoder(
contentTopic,
ephemeral,
autosharding
pubsubTopic ?? autosharding
? contentTopicToPubsubTopic(contentTopic, clusterId)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
Expand Down Expand Up @@ -195,12 +201,13 @@ export class Decoder implements IDecoder<DecodedMessage> {
*/
export function createDecoder(
contentTopic: string,
pubsubTopic?: PubsubTopic,
pubsubTopicShardInfo?: SingleShardInfo,
autosharding = false,
clusterId = 0
): Decoder {
return new Decoder(
autosharding
pubsubTopic ?? autosharding
? contentTopicToPubsubTopic(contentTopic, clusterId)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class Store extends BaseProtocol implements IStore {

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics =
options?.pubsubTopics ?? this.initializePubsubTopic(options?.shardInfo);
}

/**
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/lib/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export class WakuNode implements Waku {

constructor(
options: WakuOptions,
pubsubTopics: PubsubTopic[] = [],
libp2p: Libp2p,
pubsubShardInfo?: ShardInfo,
store?: (libp2p: Libp2p) => IStore,
Expand All @@ -64,7 +65,8 @@ export class WakuNode implements Waku {
relay?: (libp2p: Libp2p) => IRelay
) {
if (!pubsubShardInfo) {
this.pubsubTopics = [DefaultPubsubTopic];
this.pubsubTopics =
pubsubTopics.length > 0 ? pubsubTopics : [DefaultPubsubTopic];
} else {
this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo);
}
Expand Down
6 changes: 6 additions & 0 deletions packages/interfaces/src/enr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ export interface Waku2 {
export interface ShardInfo {
clusterId: number;
shards: number[];
/**
* @deprecated
*/
pubsubTopics?: string[];
contentTopics?: string[];
application?: string;
version?: string;
}

export interface IEnr extends Map<ENRKey, ENRValue> {
Expand Down
2 changes: 1 addition & 1 deletion packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export interface IFilterSubscription {
export type IFilter = IReceiver &
IBaseProtocol & {
createSubscription(
pubsubTopicShardInfo?: SingleShardInfo,
pubsubTopicShardInfo?: SingleShardInfo | string,
peerId?: PeerId
): Promise<IFilterSubscription>;
};
4 changes: 4 additions & 0 deletions packages/interfaces/src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ export interface IMetaSetter {
}

export interface EncoderOptions {
/**
* @deprecated
*/
pubsubTopic?: PubsubTopic;
pubsubTopicShardInfo?: SingleShardInfo;
/** The content topic to set on outgoing messages. */
contentTopic: string;
Expand Down
6 changes: 6 additions & 0 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { ShardInfo } from "./enr.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
import { PubsubTopic } from "./misc.js";

export enum Protocols {
Relay = "relay",
Expand All @@ -22,6 +23,11 @@ export interface IBaseProtocol {
}

export type ProtocolCreateOptions = {
/**
* @deprecated
* Waku will stop supporting named sharding. Only static sharding and autosharding will be supported moving forward.
*/
pubsubTopics?: PubsubTopic[];
/**
* Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future.
* The format to specify a shard is:
Expand Down
13 changes: 10 additions & 3 deletions packages/message-encryption/src/ecies.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class Encoder implements IEncoder {
}

export interface EncoderOptions extends BaseEncoderOptions {
/**
* @deprecated
*/
pubsubTopic?: PubsubTopic;
/** Indicates if autosharding is enabled */
autosharding?: boolean;
/** The public key to encrypt the payload for. */
Expand All @@ -106,6 +110,7 @@ export interface EncoderOptions extends BaseEncoderOptions {
*/
export function createEncoder({
autosharding = false,
pubsubTopic,
pubsubTopicShardInfo,
contentTopic,
publicKey,
Expand All @@ -114,7 +119,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
autosharding
pubsubTopic ?? autosharding
? contentTopicToPubsubTopic(contentTopic)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
Expand Down Expand Up @@ -206,11 +211,13 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
privateKey: Uint8Array,
pubsubTopicShardInfo?: SingleShardInfo,
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic,
autosharding = false
): Decoder {
return new Decoder(
autosharding
typeof pubsubTopicShardInfo == "string"
? DefaultPubsubTopic
: autosharding
? contentTopicToPubsubTopic(contentTopic)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
Expand Down
9 changes: 6 additions & 3 deletions packages/message-encryption/src/symmetric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ export interface EncoderOptions extends BaseEncoderOptions {
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
*/
export function createEncoder({
pubsubTopic = DefaultPubsubTopic,
autosharding = false,
pubsubTopicShardInfo,
contentTopic,
Expand All @@ -114,7 +115,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
autosharding
pubsubTopic ?? autosharding
? contentTopicToPubsubTopic(contentTopic)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
Expand Down Expand Up @@ -206,11 +207,13 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
symKey: Uint8Array,
pubsubTopicShardInfo?: SingleShardInfo,
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic,
autosharding = false
): Decoder {
return new Decoder(
autosharding
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: autosharding
? contentTopicToPubsubTopic(contentTopic)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
Expand Down
8 changes: 5 additions & 3 deletions packages/relay/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ class Relay implements IRelay {
}

this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubsubTopics = options?.shardInfo
? new Set(shardInfoToPubsubTopics(options.shardInfo))
: new Set([DefaultPubsubTopic]);
this.pubsubTopics = new Set(
options?.shardInfo
? shardInfoToPubsubTopics(options.shardInfo)
: options?.pubsubTopics ?? [DefaultPubsubTopic]
);

if (this.gossipSub.isStarted()) {
this.subscribeToAllTopics();
Expand Down
80 changes: 79 additions & 1 deletion packages/sdk/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import type {
Libp2pComponents,
LightNode,
ProtocolCreateOptions,
RelayNode
RelayNode,
ShardInfo
} from "@waku/interfaces";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
Expand All @@ -36,6 +37,68 @@ const DEFAULT_NODE_REQUIREMENTS = {

export { Libp2pComponents };

const ensureShardingConfigured = (shardInfo: ShardInfo): void => {
// Static sharding is configured if shard indexes are provided manually
if (!(shardInfo.shards && shardInfo.shards.length > 0)) {
// If not configured for static sharding, must be configured for autosharding
if (
// Autosharding is configured if at least one of the following is provided:
// 1. one or more content topics
// 2. application and version that's shared across one or more content topics
!(
(shardInfo.contentTopics && shardInfo.contentTopics.length > 0) ||
(shardInfo.application && shardInfo.version)
)
) {
throw new Error(
"Missing required configuration options for static sharding or autosharding."
);
}
}
};

/**
* Create a Waku node configured to use autosharding or static sharding.
*/
export async function createNode(
options?: ProtocolCreateOptions & WakuOptions & Partial<RelayCreateOptions>
): Promise<LightNode> {
options = options ?? {};

if (!options.shardInfo) {
throw new Error("Shard info must be set");
}

ensureShardingConfigured(options.shardInfo);

const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) {
peerDiscovery.push(...defaultPeerDiscoveries());
Object.assign(libp2pOptions, { peerDiscovery });
}

const libp2p = await defaultLibp2p(
undefined,
libp2pOptions,
options?.userAgent
);

const store = wakuStore(options);
const lightPush = wakuLightPush(options);
const filter = wakuFilter(options);

return new WakuNode(
options ?? {},
[],
libp2p,
options.shardInfo,
store,
lightPush,
filter
) as LightNode;
}

/**
* Create a Waku node that uses Waku Light Push, Filter and Store to send and
* receive messages, enabling low resource consumption.
Expand All @@ -46,6 +109,10 @@ export async function createLightNode(
): Promise<LightNode> {
options = options ?? {};

if (options.shardInfo) {
ensureShardingConfigured(options.shardInfo);
}

const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) {
Expand All @@ -65,6 +132,7 @@ export async function createLightNode(

return new WakuNode(
options ?? {},
options.pubsubTopics,
libp2p,
options.shardInfo,
store,
Expand All @@ -82,6 +150,10 @@ export async function createRelayNode(
): Promise<RelayNode> {
options = options ?? {};

if (options.shardInfo) {
ensureShardingConfigured(options.shardInfo);
}

const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) {
Expand All @@ -99,6 +171,7 @@ export async function createRelayNode(

return new WakuNode(
options,
options.pubsubTopics,
libp2p,
options.shardInfo,
undefined,
Expand Down Expand Up @@ -126,6 +199,10 @@ export async function createFullNode(
): Promise<FullNode> {
options = options ?? {};

if (options.shardInfo) {
ensureShardingConfigured(options.shardInfo);
}

const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) {
Expand All @@ -146,6 +223,7 @@ export async function createFullNode(

return new WakuNode(
options ?? {},
options.pubsubTopics,
libp2p,
options.shardInfo,
store,
Expand Down
Loading

0 comments on commit 7fc587f

Please sign in to comment.