Skip to content

Commit

Permalink
reintroduce and deprecate named sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 authored and adklempner committed Jan 2, 2024
1 parent 0d534e3 commit e7fe3ff
Show file tree
Hide file tree
Showing 21 changed files with 939 additions and 58 deletions.
14 changes: 9 additions & 5 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ class Filter extends BaseProtocol implements IReceiver {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);

this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics =
options?.pubsubTopics ?? this.initializePubsubTopic(options?.shardInfo);

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
Expand All @@ -291,11 +292,14 @@ class Filter extends BaseProtocol implements IReceiver {
}

async createSubscription(
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo?: SingleShardInfo | string
): Promise<Subscription> {
const pubsubTopic = pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic;
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic;

ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);

Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class LightPush extends BaseProtocol implements ILightPush {

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics =
options?.pubsubTopics ?? this.initializePubsubTopic(options?.shardInfo);
}

private async preparePushMessage(
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/lib/message/version_0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export class Encoder implements IEncoder {
* messages.
*/
export function createEncoder({
pubsubTopic,
pubsubTopicShardInfo,
contentTopic,
ephemeral,
Expand All @@ -126,7 +127,7 @@ export function createEncoder({
return new Encoder(
contentTopic,
ephemeral,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo),
metaSetter
);
}
Expand Down Expand Up @@ -186,7 +187,7 @@ export class Decoder implements IDecoder<DecodedMessage> {
*/
export function createDecoder(
contentTopic: string,
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic
): Decoder {
return new Decoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class Store extends BaseProtocol implements IStore {

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics =
options?.pubsubTopics ?? this.initializePubsubTopic(options?.shardInfo);
}

/**
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/lib/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class WakuNode implements Waku {

constructor(
options: WakuOptions,
pubsubTopics: PubsubTopic[] = [],
libp2p: Libp2p,
pubsubShardInfo?: ShardingParams,
store?: (libp2p: Libp2p) => IStore,
Expand All @@ -63,7 +64,8 @@ export class WakuNode implements Waku {
relay?: (libp2p: Libp2p) => IRelay
) {
if (!pubsubShardInfo) {
this.pubsubTopics = [DefaultPubsubTopic];
this.pubsubTopics =
pubsubTopics.length > 0 ? pubsubTopics : [DefaultPubsubTopic];
} else {
this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { PeerId } from "@libp2p/interface/peer-id";

import type { IDecodedMessage, IDecoder, SingleShardInfo } from "./message.js";
import type { ContentTopic } from "./misc.js";
import type { ContentTopic, PubsubTopic } from "./misc.js";
import type { Callback, IBaseProtocol } from "./protocols.js";
import type { IReceiver } from "./receiver.js";

Expand All @@ -25,7 +25,7 @@ export interface IFilterSubscription {
export type IFilter = IReceiver &
IBaseProtocol & {
createSubscription(
pubsubTopicShardInfo?: SingleShardInfo,
pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic,
peerId?: PeerId
): Promise<IFilterSubscription>;
};
4 changes: 4 additions & 0 deletions packages/interfaces/src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ export interface IMetaSetter {
}

export interface EncoderOptions {
/**
* @deprecated
*/
pubsubTopic?: PubsubTopic;
pubsubTopicShardInfo?: SingleShardInfo;
/** The content topic to set on outgoing messages. */
contentTopic: string;
Expand Down
14 changes: 13 additions & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { ShardInfo } from "./enr.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
import { PubsubTopic } from "./misc.js";

export enum Protocols {
Relay = "relay",
Expand All @@ -26,9 +27,20 @@ export type ContentTopicInfo = {
contentTopics: string[];
};

export type ShardingParams = ShardInfo | ContentTopicInfo;
export type ApplicationInfo = {
clusterId: number;
application: string;
version: string;
};

export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo;

export type ProtocolCreateOptions = {
/**
* @deprecated
* Waku will stop supporting named sharding. Only static sharding and autosharding will be supported moving forward.
*/
pubsubTopics?: PubsubTopic[];
/**
* Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future.
* The format to specify a shard is:
Expand Down
28 changes: 17 additions & 11 deletions packages/message-encryption/src/ecies.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import type {
EncoderOptions as BaseEncoderOptions,
IDecoder,
IEncoder,
IMessage,
IMetaSetter,
IProtoMessage,
PubsubTopic,
SingleShardInfo
import {
type EncoderOptions as BaseEncoderOptions,
DefaultPubsubTopic,
type IDecoder,
type IEncoder,
type IMessage,
type IMetaSetter,
type IProtoMessage,
type PubsubTopic,
type SingleShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { determinePubsubTopic, Logger } from "@waku/utils";
Expand Down Expand Up @@ -79,6 +80,10 @@ class Encoder implements IEncoder {
}

export interface EncoderOptions extends BaseEncoderOptions {
/**
* @deprecated
*/
pubsubTopic?: PubsubTopic;
/** The public key to encrypt the payload for. */
publicKey: Uint8Array;
/** An optional private key to be used to sign the payload before encryption. */
Expand All @@ -98,6 +103,7 @@ export interface EncoderOptions extends BaseEncoderOptions {
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
*/
export function createEncoder({
pubsubTopic,
pubsubTopicShardInfo,
contentTopic,
publicKey,
Expand All @@ -106,7 +112,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo),
contentTopic,
publicKey,
sigPrivKey,
Expand Down Expand Up @@ -194,7 +200,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
privateKey: Uint8Array,
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Decoder {
return new Decoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
Expand Down
24 changes: 13 additions & 11 deletions packages/message-encryption/src/symmetric.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import type {
EncoderOptions as BaseEncoderOptions,
IDecoder,
IEncoder,
IMessage,
IMetaSetter,
IProtoMessage,
PubsubTopic,
SingleShardInfo
import {
type EncoderOptions as BaseEncoderOptions,
DefaultPubsubTopic,
type IDecoder,
type IEncoder,
type IMessage,
type IMetaSetter,
type IProtoMessage,
type PubsubTopic,
type SingleShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { determinePubsubTopic, Logger } from "@waku/utils";
Expand Down Expand Up @@ -98,6 +99,7 @@ export interface EncoderOptions extends BaseEncoderOptions {
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
*/
export function createEncoder({
pubsubTopic = DefaultPubsubTopic,
pubsubTopicShardInfo,
contentTopic,
symKey,
Expand All @@ -106,7 +108,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo),
contentTopic,
symKey,
sigPrivKey,
Expand Down Expand Up @@ -194,7 +196,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
symKey: Uint8Array,
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Decoder {
return new Decoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
Expand Down
10 changes: 6 additions & 4 deletions packages/relay/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PubSub as Libp2pPubsub } from "@libp2p/interface/pubsub";
import { sha256 } from "@noble/hashes/sha256";
import { DefaultPubsubTopic } from "@waku/interfaces";
import {
ActiveSubscriptions,
Callback,
DefaultPubsubTopic,
IAsyncIterator,
IDecodedMessage,
IDecoder,
Expand Down Expand Up @@ -72,9 +72,11 @@ class Relay implements IRelay {
}

this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubsubTopics = options?.shardInfo
? new Set(shardInfoToPubsubTopics(options.shardInfo))
: new Set([DefaultPubsubTopic]);
this.pubsubTopics = new Set(
options?.shardInfo
? shardInfoToPubsubTopics(options.shardInfo)
: options?.pubsubTopics ?? [DefaultPubsubTopic]
);

if (this.gossipSub.isStarted()) {
this.subscribeToAllTopics();
Expand Down
Loading

0 comments on commit e7fe3ff

Please sign in to comment.