Skip to content

Commit

Permalink
feat: improve filter subscriptions (#2193)
Browse files Browse the repository at this point in the history
* add message cache to Filter

* remove WakuOptions and use only ProtocolCreateOptions

* move subscribe options to createLightNode Fitler protocol options

* rename SubscriptionManager to Subscription

* rename to CreateNodeOptions

* add warning

* feat: introduce subscription manager (#2202)

* feat: inroduce subscription manager

* fix: make pipeline succeed (#2238)

* fix test

* use hardcoded value

* update playwright

* fix test:browser
  • Loading branch information
weboko authored Jan 29, 2025
1 parent 994d05e commit 1be6e2d
Show file tree
Hide file tree
Showing 25 changed files with 701 additions and 805 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
browser:
runs-on: ubuntu-latest
container:
image: mcr.microsoft.com/playwright:v1.48.0-jammy
image: mcr.microsoft.com/playwright:v1.50.0-jammy
env:
HOME: "/root"
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/playwright.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
timeout-minutes: 60
runs-on: ubuntu-latest
container:
image: mcr.microsoft.com/playwright:v1.48.0-jammy
image: mcr.microsoft.com/playwright:v1.50.0-jammy
steps:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
Expand Down
2 changes: 1 addition & 1 deletion packages/browser-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"test": "npx playwright test"
},
"devDependencies": {
"@playwright/test": "^1.48.1",
"@playwright/test": "^1.50.0",
"@waku/create-app": "^0.1.1-504bcd4",
"dotenv-flow": "^4.1.0",
"serve": "^14.2.3"
Expand Down
32 changes: 24 additions & 8 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,34 @@ export type SubscriptionCallback<T extends IDecodedMessage> = {
callback: Callback<T>;
};

export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
enableLightPushFilterCheck?: boolean;
export type FilterProtocolOptions = {
/**
* Interval with which Filter subscription will attempt to send ping requests to subscribed peers.
*
* @default 60_000
*/
keepAliveIntervalMs: number;

/**
* Number of failed pings allowed to make to a remote peer before attempting to subscribe to a new one.
*
* @default 3
*/
pingsBeforePeerRenewed: number;

/**
* Enables js-waku to send probe LightPush message over subscribed pubsubTopics on created subscription.
* In case message won't be received back through Filter - js-waku will attempt to subscribe to another peer.
*
* @default false
*/
enableLightPushFilterCheck: boolean;
};

export interface ISubscription {
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options?: SubscribeOptions
callback: Callback<T>
): Promise<SDKProtocolResult>;

unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult>;
Expand All @@ -38,8 +55,7 @@ export interface ISubscription {
export type IFilter = IReceiver & { protocol: IBaseProtocolCore } & {
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
subscribeOptions?: SubscribeOptions
callback: Callback<T>
): Promise<SubscribeResult>;
};

Expand Down
24 changes: 13 additions & 11 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";

import type { ConnectionManagerOptions } from "./connection_manager.js";
import type { FilterProtocolOptions } from "./filter.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
import { ThisAndThat, ThisOrThat } from "./misc.js";
Expand All @@ -23,19 +24,13 @@ export type IBaseProtocolCore = {

export type NetworkConfig = StaticSharding | AutoSharding;

export type ProtocolCreateOptions = {
export type CreateNodeOptions = {
/**
* Configuration for determining the network in use.
*
* If using Static Sharding:
* Default value is configured for The Waku Network.
* The format to specify a shard is: clusterId: number, shards: number[]
* To learn more about the sharding specification, see [Relay Sharding](https://rfc.vac.dev/spec/51/).
* Set the user agent string to be used in identification of the node.
*
* If using Auto Sharding:
* See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details.
* You cannot add or remove content topics after initialization of the node.
* @default "js-waku"
*/
userAgent?: string;

/**
* Configuration for determining the network in use.
Expand Down Expand Up @@ -93,10 +88,17 @@ export type ProtocolCreateOptions = {
bootstrapPeers?: string[];

/**
* Configuration for connection manager. If not specified - default values are applied.
* Configuration for connection manager.
* If not specified - default values are applied.
*/
connectionManager?: Partial<ConnectionManagerOptions>;

/**
* Configuration for Filter protocol.
* If not specified - default values are applied.
*/
filter?: Partial<FilterProtocolOptions>;

/**
* Options for the Store protocol.
*/
Expand Down
13 changes: 4 additions & 9 deletions packages/relay/src/create.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import type { RelayNode } from "@waku/interfaces";
import {
createLibp2pAndUpdateOptions,
CreateWakuNodeOptions,
WakuNode,
WakuOptions
} from "@waku/sdk";
import type { CreateNodeOptions, RelayNode } from "@waku/interfaces";
import { createLibp2pAndUpdateOptions, WakuNode } from "@waku/sdk";

import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js";

Expand All @@ -19,7 +14,7 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js";
* or use this function with caution.
*/
export async function createRelayNode(
options: CreateWakuNodeOptions & Partial<RelayCreateOptions>
options: CreateNodeOptions & Partial<RelayCreateOptions>
): Promise<RelayNode> {
options = {
...options,
Expand All @@ -36,7 +31,7 @@ export async function createRelayNode(

return new WakuNode(
pubsubTopics,
options as WakuOptions,
options as CreateNodeOptions,
libp2p,
{},
relay
Expand Down
4 changes: 2 additions & 2 deletions packages/relay/src/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import { sha256 } from "@noble/hashes/sha256";
import {
ActiveSubscriptions,
Callback,
CreateNodeOptions,
IAsyncIterator,
IDecodedMessage,
IDecoder,
IEncoder,
IMessage,
IRelay,
Libp2p,
ProtocolCreateOptions,
ProtocolError,
PubsubTopic,
SDKProtocolResult
Expand All @@ -39,7 +39,7 @@ export type Observer<T extends IDecodedMessage> = {
callback: Callback<T>;
};

export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts;
export type RelayCreateOptions = CreateNodeOptions & GossipsubOpts;
export type ContentTopic = string;

/**
Expand Down
6 changes: 3 additions & 3 deletions packages/sdk/src/create/create.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type LightNode } from "@waku/interfaces";
import type { CreateNodeOptions, LightNode } from "@waku/interfaces";

import { CreateWakuNodeOptions, WakuNode } from "../waku/index.js";
import { WakuNode } from "../waku/index.js";

import { createLibp2pAndUpdateOptions } from "./libp2p.js";

Expand All @@ -10,7 +10,7 @@ import { createLibp2pAndUpdateOptions } from "./libp2p.js";
* Uses Waku Filter V2 by default.
*/
export async function createLightNode(
options: CreateWakuNodeOptions = {}
options: CreateNodeOptions = {}
): Promise<LightNode> {
const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options);

Expand Down
11 changes: 5 additions & 6 deletions packages/sdk/src/create/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { all as filterAll, wss } from "@libp2p/websockets/filters";
import { wakuMetadata } from "@waku/core";
import {
type CreateLibp2pOptions,
type CreateNodeOptions,
DefaultNetworkConfig,
type IMetadata,
type Libp2p,
Expand All @@ -18,11 +19,6 @@ import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils";
import { createLibp2p } from "libp2p";

import { isTestEnvironment } from "../env.js";
import {
CreateWakuNodeOptions,
DefaultPingMaxInboundStreams,
DefaultUserAgent
} from "../waku/index.js";

import { defaultPeerDiscoveries } from "./discovery.js";

Expand All @@ -32,6 +28,9 @@ type MetadataService = {

const log = new Logger("sdk:create");

const DefaultUserAgent = "js-waku";
const DefaultPingMaxInboundStreams = 10;

export async function defaultLibp2p(
pubsubTopics: PubsubTopic[],
options?: Partial<CreateLibp2pOptions>,
Expand Down Expand Up @@ -79,7 +78,7 @@ export async function defaultLibp2p(
}

export async function createLibp2pAndUpdateOptions(
options: CreateWakuNodeOptions
options: CreateNodeOptions
): Promise<{ libp2p: Libp2p; pubsubTopics: PubsubTopic[] }> {
const { networkConfig } = options;
const pubsubTopics = derivePubsubTopicsFromNetworkConfig(
Expand Down
7 changes: 1 addition & 6 deletions packages/sdk/src/protocols/filter/constants.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
export const DEFAULT_KEEP_ALIVE = 60_000;
export const DEFAULT_MAX_PINGS = 3;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000;

export const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE,
enableLightPushFilterCheck: DEFAULT_LIGHT_PUSH_FILTER_CHECK
};
Loading

0 comments on commit 1be6e2d

Please sign in to comment.