Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reintroduce and deprecate named sharding, make autosharding default for all other cases #1751

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type {
IBaseProtocol,
Libp2pComponents,
PubsubTopic,
ShardingParams
ProtocolCreateOptions,
PubsubTopic
} from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
Expand Down Expand Up @@ -103,9 +103,12 @@ export class BaseProtocol implements IBaseProtocol {
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
}

initializePubsubTopic(shardInfo?: ShardingParams): PubsubTopic[] {
return shardInfo
? shardInfoToPubsubTopics(shardInfo)
: [DefaultPubsubTopic];
initializePubsubTopic(options?: ProtocolCreateOptions): PubsubTopic[] {
return (
options?.pubsubTopics ??
(options?.shardInfo
? shardInfoToPubsubTopics(options.shardInfo)
: [DefaultPubsubTopic])
);
}
}
11 changes: 6 additions & 5 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class Filter extends BaseProtocol implements IReceiver {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);

this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics = this.initializePubsubTopic(options);

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
Expand All @@ -291,11 +291,12 @@ class Filter extends BaseProtocol implements IReceiver {
}

async createSubscription(
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Promise<Subscription> {
const pubsubTopic = pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic;
const pubsubTopic =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering is this.initializePubsubTopic can be used here or adjusted to be used

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.initializePubsubTopic is used to set the pubsub topics for the Filter object, while this const pubsubTopic is used to get/set the pubsub topic for the Subscription object.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated ternary expression to be less verbose in latest commit

typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: singleShardInfoToPubsubTopic(pubsubTopicShardInfo);

ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class LightPush extends BaseProtocol implements ILightPush {

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics = this.initializePubsubTopic(options);
}

private async preparePushMessage(
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/lib/message/version_0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export class Encoder implements IEncoder {
* messages.
*/
export function createEncoder({
pubsubTopic,
pubsubTopicShardInfo,
contentTopic,
ephemeral,
Expand All @@ -126,7 +127,7 @@ export function createEncoder({
return new Encoder(
contentTopic,
ephemeral,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo),
metaSetter
);
}
Expand Down Expand Up @@ -186,7 +187,7 @@ export class Decoder implements IDecoder<DecodedMessage> {
*/
export function createDecoder(
contentTopic: string,
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic
): Decoder {
return new Decoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class Store extends BaseProtocol implements IStore {

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics = this.initializePubsubTopic(options);
}

/**
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/lib/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class WakuNode implements Waku {

constructor(
options: WakuOptions,
pubsubTopics: PubsubTopic[] = [],
libp2p: Libp2p,
pubsubShardInfo?: ShardingParams,
store?: (libp2p: Libp2p) => IStore,
Expand All @@ -63,7 +64,8 @@ export class WakuNode implements Waku {
relay?: (libp2p: Libp2p) => IRelay
) {
if (!pubsubShardInfo) {
this.pubsubTopics = [DefaultPubsubTopic];
this.pubsubTopics =
pubsubTopics.length > 0 ? pubsubTopics : [DefaultPubsubTopic];
} else {
this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { PeerId } from "@libp2p/interface/peer-id";

import type { IDecodedMessage, IDecoder, SingleShardInfo } from "./message.js";
import type { ContentTopic } from "./misc.js";
import type { ContentTopic, PubsubTopic } from "./misc.js";
import type { Callback, IBaseProtocol } from "./protocols.js";
import type { IReceiver } from "./receiver.js";

Expand All @@ -25,7 +25,7 @@ export interface IFilterSubscription {
export type IFilter = IReceiver &
IBaseProtocol & {
createSubscription(
pubsubTopicShardInfo?: SingleShardInfo,
pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic,
peerId?: PeerId
): Promise<IFilterSubscription>;
};
4 changes: 4 additions & 0 deletions packages/interfaces/src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ export interface IMetaSetter {
}

export interface EncoderOptions {
/**
* @deprecated
*/
pubsubTopic?: PubsubTopic;
pubsubTopicShardInfo?: SingleShardInfo;
Comment on lines +51 to 53
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a fan of this redundancy :(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danisharora099 This is temporary and a deprecated option. I kept it to allow autosharding support without introducing breaking changes for any existing users.

Another option would be to just keep pubsubTopic and make its type pubsubTopic?: PubsubTopic | SingleShardInfo, but I'm not sure how to communicate that we're deprecating part of a type. I'm open to other suggestions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think deprecation period is a good way to go so that we will fully remove the flag in the upcoming update.
In which case I think we should bump minor version and release 0.1.*.

/** The content topic to set on outgoing messages. */
contentTopic: string;
Expand Down
14 changes: 13 additions & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { ShardInfo } from "./enr.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
import { PubsubTopic } from "./misc.js";

export enum Protocols {
Relay = "relay",
Expand All @@ -26,9 +27,20 @@ export type ContentTopicInfo = {
contentTopics: string[];
};

export type ShardingParams = ShardInfo | ContentTopicInfo;
export type ApplicationInfo = {
clusterId: number;
application: string;
version: string;
};

export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo;

export type ProtocolCreateOptions = {
/**
* @deprecated
* Waku will stop supporting named sharding. Only static sharding and autosharding will be supported moving forward.
*/
pubsubTopics?: PubsubTopic[];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adklempner can you, please, create a task with description of what should be removed to completely deprecate parameters/functionality? So that we address it in a skip release or something

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #1775

/**
* Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future.
* The format to specify a shard is:
Expand Down
28 changes: 17 additions & 11 deletions packages/message-encryption/src/ecies.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import type {
EncoderOptions as BaseEncoderOptions,
IDecoder,
IEncoder,
IMessage,
IMetaSetter,
IProtoMessage,
PubsubTopic,
SingleShardInfo
import {
type EncoderOptions as BaseEncoderOptions,
DefaultPubsubTopic,
type IDecoder,
type IEncoder,
type IMessage,
type IMetaSetter,
type IProtoMessage,
type PubsubTopic,
type SingleShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { determinePubsubTopic, Logger } from "@waku/utils";
Expand Down Expand Up @@ -79,6 +80,10 @@ class Encoder implements IEncoder {
}

export interface EncoderOptions extends BaseEncoderOptions {
/**
* @deprecated
*/
pubsubTopic?: PubsubTopic;
/** The public key to encrypt the payload for. */
publicKey: Uint8Array;
/** An optional private key to be used to sign the payload before encryption. */
Expand All @@ -98,6 +103,7 @@ export interface EncoderOptions extends BaseEncoderOptions {
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
*/
export function createEncoder({
pubsubTopic,
pubsubTopicShardInfo,
contentTopic,
publicKey,
Expand All @@ -106,7 +112,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo),
contentTopic,
publicKey,
sigPrivKey,
Expand Down Expand Up @@ -194,7 +200,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
privateKey: Uint8Array,
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Decoder {
return new Decoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
Expand Down
24 changes: 13 additions & 11 deletions packages/message-encryption/src/symmetric.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import type {
EncoderOptions as BaseEncoderOptions,
IDecoder,
IEncoder,
IMessage,
IMetaSetter,
IProtoMessage,
PubsubTopic,
SingleShardInfo
import {
type EncoderOptions as BaseEncoderOptions,
DefaultPubsubTopic,
type IDecoder,
type IEncoder,
type IMessage,
type IMetaSetter,
type IProtoMessage,
type PubsubTopic,
type SingleShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { determinePubsubTopic, Logger } from "@waku/utils";
Expand Down Expand Up @@ -98,6 +99,7 @@ export interface EncoderOptions extends BaseEncoderOptions {
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
*/
export function createEncoder({
pubsubTopic = DefaultPubsubTopic,
pubsubTopicShardInfo,
contentTopic,
symKey,
Expand All @@ -106,7 +108,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo),
contentTopic,
symKey,
sigPrivKey,
Expand Down Expand Up @@ -194,7 +196,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
symKey: Uint8Array,
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Decoder {
return new Decoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
Expand Down
10 changes: 6 additions & 4 deletions packages/relay/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PubSub as Libp2pPubsub } from "@libp2p/interface/pubsub";
import { sha256 } from "@noble/hashes/sha256";
import { DefaultPubsubTopic } from "@waku/interfaces";
import {
ActiveSubscriptions,
Callback,
DefaultPubsubTopic,
IAsyncIterator,
IDecodedMessage,
IDecoder,
Expand Down Expand Up @@ -72,9 +72,11 @@ class Relay implements IRelay {
}

this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubsubTopics = options?.shardInfo
? new Set(shardInfoToPubsubTopics(options.shardInfo))
: new Set([DefaultPubsubTopic]);
this.pubsubTopics = new Set(
options?.shardInfo
? shardInfoToPubsubTopics(options.shardInfo)
: options?.pubsubTopics ?? [DefaultPubsubTopic]
);

if (this.gossipSub.isStarted()) {
this.subscribeToAllTopics();
Expand Down
Loading
Loading