Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: attempt to fix some of the Filter issues #2183

Merged
merged 15 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions packages/interfaces/src/light_push.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js";
import { IBaseProtocolCore } from "./protocols.js";
import type { ISender } from "./sender.js";

export type ILightPush = ISender &
IBaseProtocolSDK & { protocol: IBaseProtocolCore };
export type ILightPush = ISender & { protocol: IBaseProtocolCore };
17 changes: 15 additions & 2 deletions packages/interfaces/src/sender.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
import type { IEncoder, IMessage } from "./message.js";
import { ProtocolUseOptions, SDKProtocolResult } from "./protocols.js";
import { SDKProtocolResult } from "./protocols.js";

export type ISenderOptions = {
/**
* Enables retry of a message that was failed to be sent.
* @default false
*/
autoRetry?: boolean;
/**
* Sets number of attempts if `autoRetry` is enabled.
* @default 3
*/
maxAttempts?: number;
};

export interface ISender {
send: (
encoder: IEncoder,
message: IMessage,
sendOptions?: ProtocolUseOptions
sendOptions?: ISenderOptions
) => Promise<SDKProtocolResult>;
}
8 changes: 4 additions & 4 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ interface Options {
maintainPeersInterval?: number;
}

const DEFAULT_NUM_PEERS_TO_USE = 2;
export const DEFAULT_NUM_PEERS_TO_USE = 2;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;

export class BaseProtocolSDK implements IBaseProtocolSDK {
Expand All @@ -29,20 +29,20 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
) {
this.log = new Logger(`sdk:${core.multicodec}`);

this.peerManager = new PeerManager(connectionManager, core, this.log);

this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
const maintainPeersInterval =
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;

this.peerManager = new PeerManager(connectionManager, core, this.log);

this.log.info(
`Initializing BaseProtocolSDK with numPeersToUse: ${this.numPeersToUse}, maintainPeersInterval: ${maintainPeersInterval}ms`
);
void this.startMaintainPeersInterval(maintainPeersInterval);
}

public get connectedPeers(): Peer[] {
return this.peerManager.getPeers();
return this.peerManager.getPeers().slice(0, this.numPeersToUse);
}

/**
Expand Down
43 changes: 23 additions & 20 deletions packages/sdk/src/protocols/filter/subscription_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ const log = new Logger("sdk:filter:subscription_manager");
export class SubscriptionManager implements ISubscription {
private reliabilityMonitor: ReceiverReliabilityMonitor;

private keepAliveTimer: number | null = null;
private keepAliveTimeout: number = DEFAULT_KEEP_ALIVE;
private keepAliveInterval: ReturnType<typeof setInterval> | null = null;

private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
Expand Down Expand Up @@ -67,7 +69,7 @@ export class SubscriptionManager implements ISubscription {
options.maxMissedMessagesThreshold
);
this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed);
this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.keepAliveTimeout = options.keepAlive || DEFAULT_KEEP_ALIVE;

const decodersArray = Array.isArray(decoders) ? decoders : [decoders];

Expand Down Expand Up @@ -112,7 +114,7 @@ export class SubscriptionManager implements ISubscription {
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});

this.startSubscriptionsMaintenance(this.keepAliveTimer);
this.startSubscriptionsMaintenance(this.keepAliveTimeout);

return finalResult;
}
Expand Down Expand Up @@ -240,23 +242,26 @@ export class SubscriptionManager implements ISubscription {
let result;
try {
result = await this.protocol.ping(peer);
return result;
} catch (error) {
return {
result = {
success: null,
failure: {
peerId,
error: ProtocolError.GENERIC_FAIL
}
};
} finally {
void this.reliabilityMonitor.handlePingResult(peerId, result);
}

log.info(
`Received result from filter ping peerId:${peerId.toString()}\tsuccess:${result.success?.toString()}\tfailure:${result.failure?.error}`
);
await this.reliabilityMonitor.handlePingResult(peerId, result);
return result;
}

private startSubscriptionsMaintenance(interval: number): void {
private startSubscriptionsMaintenance(timeout: number): void {
log.info("Starting subscriptions maintenance");
this.startKeepAlivePings(interval);
this.startKeepAlivePings(timeout);
this.startConnectionListener();
}

Expand Down Expand Up @@ -295,31 +300,29 @@ export class SubscriptionManager implements ISubscription {
log.error(`networkStateListener failed to recover: ${err}`);
}

this.startKeepAlivePings(this.keepAliveTimer || DEFAULT_KEEP_ALIVE);
this.startKeepAlivePings(this.keepAliveTimeout);
}

private startKeepAlivePings(interval: number): void {
if (this.keepAliveTimer) {
private startKeepAlivePings(timeout: number): void {
if (this.keepAliveInterval) {
log.info("Recurring pings already set up.");
return;
}

this.keepAliveTimer = setInterval(() => {
void this.ping()
.then(() => log.info("Keep-alive ping successful"))
.catch((error) => log.error("Error in keep-alive ping cycle:", error));
}, interval) as unknown as number;
this.keepAliveInterval = setInterval(() => {
void this.ping();
}, timeout);
}

private stopKeepAlivePings(): void {
if (!this.keepAliveTimer) {
if (!this.keepAliveInterval) {
log.info("Already stopped recurring pings.");
return;
}

log.info("Stopping recurring pings.");
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = null;
clearInterval(this.keepAliveInterval);
this.keepAliveInterval = null;
}
}

Expand Down
170 changes: 170 additions & 0 deletions packages/sdk/src/protocols/light_push/light_push.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { Peer } from "@libp2p/interface";
import {
ConnectionManager,
createEncoder,
Encoder,
LightPushCodec
} from "@waku/core";
import { Libp2p, ProtocolError } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import sinon from "sinon";

import { LightPush } from "./light_push.js";

const PUBSUB_TOPIC = "/waku/2/rs/1/4";
const CONTENT_TOPIC = "/test/1/waku-light-push/utf8";

describe("LightPush SDK", () => {
let libp2p: Libp2p;
let encoder: Encoder;
let lightPush: LightPush;

beforeEach(() => {
libp2p = mockLibp2p();
encoder = createEncoder({ contentTopic: CONTENT_TOPIC });
lightPush = mockLightPush({ libp2p });
});

it("should fail to send if pubsub topics are misconfigured", async () => {
lightPush = mockLightPush({ libp2p, pubsubTopics: ["/wrong"] });

const result = await lightPush.send(encoder, {
payload: utf8ToBytes("test")
});
const failures = result.failures ?? [];

expect(failures.length).to.be.eq(1);
expect(failures.some((v) => v.error === ProtocolError.TOPIC_NOT_CONFIGURED))
.to.be.true;
});

it("should fail to send if no connected peers found", async () => {
const result = await lightPush.send(encoder, {
payload: utf8ToBytes("test")
});
const failures = result.failures ?? [];

expect(failures.length).to.be.eq(1);
expect(failures.some((v) => v.error === ProtocolError.NO_PEER_AVAILABLE)).to
.be.true;
});

it("should send to specified number of peers of used peers", async () => {
libp2p = mockLibp2p({
peers: [mockPeer("1"), mockPeer("2"), mockPeer("3"), mockPeer("4")]
});

// check default value that should be 2
lightPush = mockLightPush({ libp2p });
let sendSpy = sinon.spy(
(_encoder: any, _message: any, peer: Peer) =>
({ success: peer.id }) as any
);
lightPush.protocol.send = sendSpy;

let result = await lightPush.send(encoder, {
payload: utf8ToBytes("test")
});

expect(sendSpy.calledTwice).to.be.true;
expect(result.successes?.length).to.be.eq(2);

// check if setting another value works
lightPush = mockLightPush({ libp2p, numPeersToUse: 3 });
sendSpy = sinon.spy(
(_encoder: any, _message: any, peer: Peer) =>
({ success: peer.id }) as any
);
lightPush.protocol.send = sendSpy;

result = await lightPush.send(encoder, { payload: utf8ToBytes("test") });

expect(sendSpy.calledThrice).to.be.true;
expect(result.successes?.length).to.be.eq(3);
});

it("should retry on failure if specified", async () => {
libp2p = mockLibp2p({
peers: [mockPeer("1"), mockPeer("2")]
});

lightPush = mockLightPush({ libp2p });
let sendSpy = sinon.spy((_encoder: any, _message: any, peer: Peer) => {
if (peer.id.toString() === "1") {
return { success: peer.id };
}

return { failure: { error: "problem" } };
});
lightPush.protocol.send = sendSpy as any;
const attemptRetriesSpy = sinon.spy(lightPush["attemptRetries"]);
lightPush["attemptRetries"] = attemptRetriesSpy;

const result = await lightPush.send(
encoder,
{ payload: utf8ToBytes("test") },
{ autoRetry: true }
);

expect(attemptRetriesSpy.calledOnce).to.be.true;
expect(result.successes?.length).to.be.eq(1);
expect(result.failures?.length).to.be.eq(1);

sendSpy = sinon.spy(() => ({ failure: { error: "problem" } })) as any;
await lightPush["attemptRetries"](sendSpy as any);

expect(sendSpy.callCount).to.be.eq(3);

sendSpy = sinon.spy(() => ({ failure: { error: "problem" } })) as any;
await lightPush["attemptRetries"](sendSpy as any, 2);

expect(sendSpy.callCount).to.be.eq(2);
});
});

type MockLibp2pOptions = {
peers?: Peer[];
};

function mockLibp2p(options?: MockLibp2pOptions): Libp2p {
const peers = options?.peers || [];
const peerStore = {
get: (id: any) => Promise.resolve(peers.find((p) => p.id === id))
};

return {
peerStore,
getPeers: () => peers.map((p) => p.id),
components: {
events: new EventTarget(),
connectionManager: {
getConnections: () => []
} as any,
peerStore
}
} as unknown as Libp2p;
}

type MockLightPushOptions = {
libp2p: Libp2p;
pubsubTopics?: string[];
numPeersToUse?: number;
};

function mockLightPush(options: MockLightPushOptions): LightPush {
return new LightPush(
{
configuredPubsubTopics: options.pubsubTopics || [PUBSUB_TOPIC]
} as ConnectionManager,
options.libp2p,
{ numPeersToUse: options.numPeersToUse }
);
}

function mockPeer(id: string): Peer {
return {
id,
protocols: [LightPushCodec]
} as unknown as Peer;
}
Loading
Loading