Skip to content

Commit

Permalink
move relay functionality to separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
weboko committed Jul 23, 2024
1 parent a739ada commit a4e8aab
Show file tree
Hide file tree
Showing 23 changed files with 384 additions and 422 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
"packages/message-hash",
"packages/enr",
"packages/core",
"packages/relay",
"packages/discovery",
"packages/message-encryption",
"packages/sdk",
"packages/relay",
"packages/tests",
"packages/browser-tests",
"packages/build-utils",
Expand Down
7 changes: 0 additions & 7 deletions packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,3 @@ export interface RelayNode extends Waku {
filter: undefined;
lightPush: undefined;
}

export interface FullNode extends Waku {
relay: IRelay;
store: IStoreSDK;
filter: IFilterSDK;
lightPush: ILightPushSDK;
}
2 changes: 2 additions & 0 deletions packages/relay/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"@chainsafe/libp2p-gossipsub": "^12.0.0",
"@noble/hashes": "^1.3.2",
"@waku/core": "0.0.30",
"@waku/sdk": "0.0.26",
"@waku/interfaces": "0.0.25",
"@waku/proto": "0.0.7",
"@waku/utils": "0.0.18",
Expand All @@ -67,6 +68,7 @@
"rollup": "^4.12.0"
},
"peerDependencies": {
"@waku/sdk": "0.0.26",
"@waku/core": "0.0.30",
"@waku/interfaces": "0.0.25",
"@waku/proto": "0.0.7",
Expand Down
40 changes: 40 additions & 0 deletions packages/relay/src/create.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { type RelayNode } from "@waku/interfaces";
import {
createLibp2pAndUpdateOptions,
CreateWakuNodeOptions,
WakuNode,
WakuOptions
} from "@waku/sdk";

import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js";

/**
* Create a Waku node that uses Waku Relay to send and receive messages,
* enabling some privacy preserving properties.
* * @remarks
* This function creates a Relay Node using the Waku Relay protocol.
* While it is technically possible to use this function in a browser environment,
* it is not recommended due to potential performance issues and limited browser capabilities.
* If you are developing a browser-based application, consider alternative approaches like creating a Light Node
* or use this function with caution.
*/
export async function createRelayNode(
options: CreateWakuNodeOptions & Partial<RelayCreateOptions> = {
pubsubTopics: []
}
): Promise<RelayNode> {
options = {
...options,
libp2p: {
...options.libp2p,
services: {
pubsub: wakuGossipSub(options)
}
}
};

const libp2p = await createLibp2pAndUpdateOptions(options);
const relay = wakuRelay(options?.pubsubTopics || [])(libp2p);

return new WakuNode(options as WakuOptions, libp2p, {}, relay) as RelayNode;
}
319 changes: 2 additions & 317 deletions packages/relay/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,317 +1,2 @@
import {
GossipSub,
GossipSubComponents,
GossipsubMessage,
GossipsubOpts
} from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { PubSub as Libp2pPubsub, PeerId } from "@libp2p/interface";
import { sha256 } from "@noble/hashes/sha256";
import {
ActiveSubscriptions,
Callback,
DefaultPubsubTopic,
IAsyncIterator,
IDecodedMessage,
IDecoder,
IEncoder,
IMessage,
IRelay,
Libp2p,
ProtocolCreateOptions,
ProtocolError,
PubsubTopic,
SDKProtocolResult
} from "@waku/interfaces";
import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils";
import { pushOrInitMapSet } from "@waku/utils";
import { Logger } from "@waku/utils";

import { RelayCodecs } from "./constants.js";
import { messageValidator } from "./message_validator.js";
import { TopicOnlyDecoder } from "./topic_only_message.js";

const log = new Logger("relay");

export type Observer<T extends IDecodedMessage> = {
decoder: IDecoder<T>;
callback: Callback<T>;
};

export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts;
export type ContentTopic = string;

/**
* Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/).
* Throws if libp2p.pubsub does not support Waku Relay
*/
class Relay implements IRelay {
public readonly pubsubTopics: Set<PubsubTopic>;
private defaultDecoder: IDecoder<IDecodedMessage>;

public static multicodec: string = RelayCodecs[0];
public readonly gossipSub: GossipSub;

/**
* observers called when receiving new message.
* Observers under key `""` are always called.
*/
private observers: Map<PubsubTopic, Map<ContentTopic, Set<unknown>>>;

public constructor(libp2p: Libp2p, pubsubTopics: PubsubTopic[]) {
if (!this.isRelayPubsub(libp2p.services.pubsub)) {
throw Error(
`Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}`
);
}

this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubsubTopics = new Set(pubsubTopics);

if (this.gossipSub.isStarted()) {
this.subscribeToAllTopics();
}

this.observers = new Map();

// Default PubsubTopic decoder
// TODO: User might want to decide what decoder should be used (e.g. for RLN)
this.defaultDecoder = new TopicOnlyDecoder();
}

/**
* Mounts the gossipsub protocol onto the libp2p node
* and subscribes to all the topics.
*
* @override
* @returns {void}
*/
public async start(): Promise<void> {
if (this.gossipSub.isStarted()) {
throw Error("GossipSub already started.");
}

await this.gossipSub.start();
this.subscribeToAllTopics();
}

/**
* Send Waku message.
*/
public async send(
encoder: IEncoder,
message: IMessage
): Promise<SDKProtocolResult> {
const successes: PeerId[] = [];

const { pubsubTopic } = encoder;
if (!this.pubsubTopics.has(pubsubTopic)) {
log.error("Failed to send waku relay: topic not configured");
return {
successes,
failures: [
{
error: ProtocolError.TOPIC_NOT_CONFIGURED
}
]
};
}

const msg = await encoder.toWire(message);
if (!msg) {
log.error("Failed to encode message, aborting publish");
return {
successes,
failures: [
{
error: ProtocolError.ENCODE_FAILED
}
]
};
}

if (!isWireSizeUnderCap(msg)) {
log.error("Failed to send waku relay: message is bigger that 1MB");
return {
successes,
failures: [
{
error: ProtocolError.SIZE_TOO_BIG
}
]
};
}

const { recipients } = await this.gossipSub.publish(pubsubTopic, msg);
return {
successes: recipients,
failures: []
};
}

public subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): () => void {
const observers: Array<[PubsubTopic, Observer<T>]> = [];

for (const decoder of Array.isArray(decoders) ? decoders : [decoders]) {
const { pubsubTopic } = decoder;
const ctObs: Map<ContentTopic, Set<Observer<T>>> = this.observers.get(
pubsubTopic
) ?? new Map();
const observer = { pubsubTopic, decoder, callback };
pushOrInitMapSet(ctObs, decoder.contentTopic, observer);

this.observers.set(pubsubTopic, ctObs);
observers.push([pubsubTopic, observer]);
}

return () => {
this.removeObservers(observers);
};
}

private removeObservers<T extends IDecodedMessage>(
observers: Array<[PubsubTopic, Observer<T>]>
): void {
for (const [pubsubTopic, observer] of observers) {
const ctObs = this.observers.get(pubsubTopic);
if (!ctObs) continue;

const contentTopic = observer.decoder.contentTopic;
const _obs = ctObs.get(contentTopic);
if (!_obs) continue;

_obs.delete(observer);
ctObs.set(contentTopic, _obs);
this.observers.set(pubsubTopic, ctObs);
}
}

public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}

public getActiveSubscriptions(): ActiveSubscriptions {
const map = new Map();
for (const pubsubTopic of this.pubsubTopics) {
map.set(pubsubTopic, Array.from(this.observers.keys()));
}
return map;
}

public getMeshPeers(topic: TopicStr = DefaultPubsubTopic): PeerIdStr[] {
return this.gossipSub.getMeshPeers(topic);
}

private subscribeToAllTopics(): void {
for (const pubsubTopic of this.pubsubTopics) {
this.gossipSubSubscribe(pubsubTopic);
}
}

private async processIncomingMessage<T extends IDecodedMessage>(
pubsubTopic: string,
bytes: Uint8Array
): Promise<void> {
const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(bytes);
if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) {
log.warn("Message does not have a content topic, skipping");
return;
}

// Retrieve the map of content topics for the given pubsubTopic
const contentTopicMap = this.observers.get(pubsubTopic);
if (!contentTopicMap) {
return;
}

// Retrieve the set of observers for the given contentTopic
const observers = contentTopicMap.get(topicOnlyMsg.contentTopic) as Set<
Observer<T>
>;
if (!observers) {
return;
}

await Promise.all(
Array.from(observers).map(({ decoder, callback }) => {
return (async () => {
try {
const protoMsg = await decoder.fromWireToProtoObj(bytes);
if (!protoMsg) {
log.error(
"Internal error: message previously decoded failed on 2nd pass."
);
return;
}
const msg = await decoder.fromProtoObj(pubsubTopic, protoMsg);
if (msg) {
await callback(msg);
} else {
log.error(
"Failed to decode messages on",
topicOnlyMsg.contentTopic
);
}
} catch (error) {
log.error("Error while decoding message:", error);
}
})();
})
);
}

/**
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
*
* @override
*/
private gossipSubSubscribe(pubsubTopic: string): void {
this.gossipSub.addEventListener(
"gossipsub:message",
(event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubsubTopic) return;

this.processIncomingMessage(
event.detail.msg.topic,
event.detail.msg.data
).catch((e) => log.error("Failed to process incoming message", e));
}
);

this.gossipSub.topicValidators.set(pubsubTopic, messageValidator);
this.gossipSub.subscribe(pubsubTopic);
}

private isRelayPubsub(pubsub: Libp2pPubsub | undefined): boolean {
return pubsub?.multicodecs?.includes(Relay.multicodec) ?? false;
}
}

export function wakuRelay(
pubsubTopics: PubsubTopic[]
): (libp2p: Libp2p) => IRelay {
return (libp2p: Libp2p) => new Relay(libp2p, pubsubTopics);
}

export function wakuGossipSub(
init: Partial<RelayCreateOptions> = {}
): (components: GossipSubComponents) => GossipSub {
return (components: GossipSubComponents) => {
init = {
...init,
msgIdFn: ({ data }) => sha256(data),
// Ensure that no signature is included nor expected in the messages.
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
fallbackToFloodsub: false
};
const pubsub = new GossipSub(components, init);
pubsub.multicodecs = RelayCodecs;
return pubsub;
};
}
export * from "./relay.js";
export * from "./create.js";
Loading

0 comments on commit a4e8aab

Please sign in to comment.