Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: lighten retry logic for LightPush #2182

Merged
merged 7 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions packages/interfaces/src/light_push.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js";
import { IBaseProtocolCore } from "./protocols.js";
import type { ISender } from "./sender.js";

export type ILightPush = ISender &
IBaseProtocolSDK & { protocol: IBaseProtocolCore };
export type ILightPush = ISender & { protocol: IBaseProtocolCore };
17 changes: 15 additions & 2 deletions packages/interfaces/src/sender.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
import type { IEncoder, IMessage } from "./message.js";
import { ProtocolUseOptions, SDKProtocolResult } from "./protocols.js";
import { SDKProtocolResult } from "./protocols.js";

export type ISenderOptions = {
/**
* Enables retry of a message that was failed to be sent.
* @default false
*/
autoRetry?: boolean;
/**
* Sets number of attempts if `autoRetry` is enabled.
* @default 3
*/
maxAttempts?: number;
};

export interface ISender {
send: (
encoder: IEncoder,
message: IMessage,
sendOptions?: ProtocolUseOptions
sendOptions?: ISenderOptions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be a breaking change since ProtocolUseOptions had a variable not in ISenderOptions. If someone has code where that optional variable was set, will it break upon updating?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true and no, ISenderOptions actually returns previously existed properties and now they are respected in this implementation

so the only breaking change is the naming

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new interface doesn't have forceUseAllPeers?: boolean;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forceUseAllPeers?: boolean;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good spot, afaik it wasn't used outside some of our tests

) => Promise<SDKProtocolResult>;
}
8 changes: 4 additions & 4 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ interface Options {
maintainPeersInterval?: number;
}

const DEFAULT_NUM_PEERS_TO_USE = 2;
export const DEFAULT_NUM_PEERS_TO_USE = 2;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;

export class BaseProtocolSDK implements IBaseProtocolSDK {
Expand All @@ -29,20 +29,20 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
) {
this.log = new Logger(`sdk:${core.multicodec}`);

this.peerManager = new PeerManager(connectionManager, core, this.log);

this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
const maintainPeersInterval =
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;

this.peerManager = new PeerManager(connectionManager, core, this.log);

this.log.info(
`Initializing BaseProtocolSDK with numPeersToUse: ${this.numPeersToUse}, maintainPeersInterval: ${maintainPeersInterval}ms`
);
void this.startMaintainPeersInterval(maintainPeersInterval);
}

public get connectedPeers(): Peer[] {
return this.peerManager.getPeers();
return this.peerManager.getPeers().slice(0, this.numPeersToUse);
}

/**
Expand Down
170 changes: 170 additions & 0 deletions packages/sdk/src/protocols/light_push/light_push.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { Peer } from "@libp2p/interface";
import {
ConnectionManager,
createEncoder,
Encoder,
LightPushCodec
} from "@waku/core";
import { Libp2p, ProtocolError } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import sinon from "sinon";

import { LightPush } from "./light_push.js";

const PUBSUB_TOPIC = "/waku/2/rs/1/4";
const CONTENT_TOPIC = "/test/1/waku-light-push/utf8";

describe("LightPush SDK", () => {
let libp2p: Libp2p;
let encoder: Encoder;
let lightPush: LightPush;

beforeEach(() => {
libp2p = mockLibp2p();
encoder = createEncoder({ contentTopic: CONTENT_TOPIC });
lightPush = mockLightPush({ libp2p });
});

it("should fail to send if pubsub topics are misconfigured", async () => {
lightPush = mockLightPush({ libp2p, pubsubTopics: ["/wrong"] });

const result = await lightPush.send(encoder, {
payload: utf8ToBytes("test")
});
const failures = result.failures ?? [];

expect(failures.length).to.be.eq(1);
expect(failures.some((v) => v.error === ProtocolError.TOPIC_NOT_CONFIGURED))
.to.be.true;
});

it("should fail to send if no connected peers found", async () => {
const result = await lightPush.send(encoder, {
payload: utf8ToBytes("test")
});
const failures = result.failures ?? [];

expect(failures.length).to.be.eq(1);
expect(failures.some((v) => v.error === ProtocolError.NO_PEER_AVAILABLE)).to
.be.true;
});

it("should send to specified number of peers of used peers", async () => {
libp2p = mockLibp2p({
peers: [mockPeer("1"), mockPeer("2"), mockPeer("3"), mockPeer("4")]
});

// check default value that should be 2
lightPush = mockLightPush({ libp2p });
let sendSpy = sinon.spy(
(_encoder: any, _message: any, peer: Peer) =>
({ success: peer.id }) as any
);
lightPush.protocol.send = sendSpy;

let result = await lightPush.send(encoder, {
payload: utf8ToBytes("test")
});

expect(sendSpy.calledTwice).to.be.true;
expect(result.successes?.length).to.be.eq(2);

// check if setting another value works
lightPush = mockLightPush({ libp2p, numPeersToUse: 3 });
sendSpy = sinon.spy(
(_encoder: any, _message: any, peer: Peer) =>
({ success: peer.id }) as any
);
lightPush.protocol.send = sendSpy;

result = await lightPush.send(encoder, { payload: utf8ToBytes("test") });

expect(sendSpy.calledThrice).to.be.true;
expect(result.successes?.length).to.be.eq(3);
});

it("should retry on failure if specified", async () => {
libp2p = mockLibp2p({
peers: [mockPeer("1"), mockPeer("2")]
});

lightPush = mockLightPush({ libp2p });
let sendSpy = sinon.spy((_encoder: any, _message: any, peer: Peer) => {
if (peer.id.toString() === "1") {
return { success: peer.id };
}

return { failure: { error: "problem" } };
});
lightPush.protocol.send = sendSpy as any;
const attemptRetriesSpy = sinon.spy(lightPush["attemptRetries"]);
lightPush["attemptRetries"] = attemptRetriesSpy;

const result = await lightPush.send(
encoder,
{ payload: utf8ToBytes("test") },
{ autoRetry: true }
);

expect(attemptRetriesSpy.calledOnce).to.be.true;
expect(result.successes?.length).to.be.eq(1);
expect(result.failures?.length).to.be.eq(1);

sendSpy = sinon.spy(() => ({ failure: { error: "problem" } })) as any;
await lightPush["attemptRetries"](sendSpy as any);

expect(sendSpy.callCount).to.be.eq(3);

sendSpy = sinon.spy(() => ({ failure: { error: "problem" } })) as any;
await lightPush["attemptRetries"](sendSpy as any, 2);

expect(sendSpy.callCount).to.be.eq(2);
});
});

type MockLibp2pOptions = {
peers?: Peer[];
};

function mockLibp2p(options?: MockLibp2pOptions): Libp2p {
const peers = options?.peers || [];
const peerStore = {
get: (id: any) => Promise.resolve(peers.find((p) => p.id === id))
};

return {
peerStore,
getPeers: () => peers.map((p) => p.id),
components: {
events: new EventTarget(),
connectionManager: {
getConnections: () => []
} as any,
peerStore
}
} as unknown as Libp2p;
}

type MockLightPushOptions = {
libp2p: Libp2p;
pubsubTopics?: string[];
numPeersToUse?: number;
};

function mockLightPush(options: MockLightPushOptions): LightPush {
return new LightPush(
{
configuredPubsubTopics: options.pubsubTopics || [PUBSUB_TOPIC]
} as ConnectionManager,
options.libp2p,
{ numPeersToUse: options.numPeersToUse }
);
}

function mockPeer(id: string): Peer {
return {
id,
protocols: [LightPushCodec]
} as unknown as Peer;
}
76 changes: 48 additions & 28 deletions packages/sdk/src/protocols/light_push/light_push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,51 @@ import {
LightPushCore
} from "@waku/core";
import {
type CoreProtocolResult,
Failure,
type IEncoder,
ILightPush,
type IMessage,
type ISenderOptions,
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
ProtocolUseOptions,
SDKProtocolResult
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";

import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { SenderReliabilityMonitor } from "../../reliability_monitor/sender.js";
import { BaseProtocolSDK } from "../base_protocol.js";
import { DEFAULT_NUM_PEERS_TO_USE } from "../base_protocol.js";

const log = new Logger("sdk:light-push");

class LightPush extends BaseProtocolSDK implements ILightPush {
public readonly protocol: LightPushCore;
const DEFAULT_MAX_ATTEMPTS = 3;
const DEFAULT_SEND_OPTIONS: ISenderOptions = {
autoRetry: false,
maxAttempts: DEFAULT_MAX_ATTEMPTS
};

type RetryCallback = (peer: Peer) => Promise<CoreProtocolResult>;

private readonly reliabilityMonitor: SenderReliabilityMonitor;
export class LightPush implements ILightPush {
Copy link
Collaborator Author

@weboko weboko Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed BaseProtocolSDK for now as it is not used
let's follow up in #2186

private numPeersToUse: number = DEFAULT_NUM_PEERS_TO_USE;
public readonly protocol: LightPushCore;

public constructor(
connectionManager: ConnectionManager,
private libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(
new LightPushCore(connectionManager.configuredPubsubTopics, libp2p),
connectionManager,
{
numPeersToUse: options?.numPeersToUse
}
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
this.protocol = new LightPushCore(
connectionManager.configuredPubsubTopics,
libp2p
);

this.reliabilityMonitor = ReliabilityMonitorManager.createSenderMonitor(
this.renewPeer.bind(this)
);

this.protocol = this.core as LightPushCore;
}

public async send(
encoder: IEncoder,
message: IMessage,
_options?: ProtocolUseOptions
options: ISenderOptions = DEFAULT_SEND_OPTIONS
): Promise<SDKProtocolResult> {
const successes: PeerId[] = [];
const failures: Failure[] = [];
Expand Down Expand Up @@ -105,14 +103,10 @@ class LightPush extends BaseProtocolSDK implements ILightPush {
if (failure) {
failures.push(failure);

const connectedPeer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);

if (connectedPeer) {
void this.reliabilityMonitor.attemptRetriesOrRenew(
connectedPeer.id,
() => this.protocol.send(encoder, message, connectedPeer)
if (options?.autoRetry) {
void this.attemptRetries(
(peer: Peer) => this.protocol.send(encoder, message, peer),
options.maxAttempts
);
}
}
Expand All @@ -129,6 +123,32 @@ class LightPush extends BaseProtocolSDK implements ILightPush {
};
}

private async attemptRetries(
fn: RetryCallback,
maxAttempts?: number
): Promise<void> {
maxAttempts = maxAttempts || DEFAULT_MAX_ATTEMPTS;
const connectedPeers = await this.getConnectedPeers();

if (connectedPeers.length === 0) {
log.warn("Cannot retry with no connected peers.");
return;
}

for (let i = 0; i < maxAttempts; i++) {
const peer = connectedPeers[i % connectedPeers.length]; // always present as we checked for the length already
const response = await fn(peer);

if (response.success) {
return;
}

log.info(
`Attempted retry for peer:${peer.id} failed with:${response?.failure?.error}`
);
}
}

private async getConnectedPeers(): Promise<Peer[]> {
const peerIDs = this.libp2p.getPeers();

Expand Down
Loading
Loading