-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: lighten retry logic for LightPush #2182
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
771440c
feat: lighten retry logic for LightPush
weboko 92d0a28
update tests
weboko 9d1afd7
remove base protocol sdk from light push, add unit tests for light push
weboko 9ec36c3
remove replaced test
weboko 0297682
ensure numPeersToUse is respected
weboko 9552c7c
skip tests
weboko 8fc68be
Merge branch 'master' into weboko/lighten-send-retry
weboko File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,4 @@ | ||
import { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js"; | ||
import { IBaseProtocolCore } from "./protocols.js"; | ||
import type { ISender } from "./sender.js"; | ||
|
||
export type ILightPush = ISender & | ||
IBaseProtocolSDK & { protocol: IBaseProtocolCore }; | ||
export type ILightPush = ISender & { protocol: IBaseProtocolCore }; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,23 @@ | ||
import type { IEncoder, IMessage } from "./message.js"; | ||
import { ProtocolUseOptions, SDKProtocolResult } from "./protocols.js"; | ||
import { SDKProtocolResult } from "./protocols.js"; | ||
|
||
export type ISenderOptions = { | ||
/** | ||
* Enables retry of a message that was failed to be sent. | ||
* @default false | ||
*/ | ||
autoRetry?: boolean; | ||
/** | ||
* Sets number of attempts if `autoRetry` is enabled. | ||
* @default 3 | ||
*/ | ||
maxAttempts?: number; | ||
}; | ||
|
||
export interface ISender { | ||
send: ( | ||
encoder: IEncoder, | ||
message: IMessage, | ||
sendOptions?: ProtocolUseOptions | ||
sendOptions?: ISenderOptions | ||
) => Promise<SDKProtocolResult>; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
170 changes: 170 additions & 0 deletions
170
packages/sdk/src/protocols/light_push/light_push.spec.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
import { Peer } from "@libp2p/interface"; | ||
import { | ||
ConnectionManager, | ||
createEncoder, | ||
Encoder, | ||
LightPushCodec | ||
} from "@waku/core"; | ||
import { Libp2p, ProtocolError } from "@waku/interfaces"; | ||
import { utf8ToBytes } from "@waku/utils/bytes"; | ||
import { expect } from "chai"; | ||
import sinon from "sinon"; | ||
|
||
import { LightPush } from "./light_push.js"; | ||
|
||
const PUBSUB_TOPIC = "/waku/2/rs/1/4"; | ||
const CONTENT_TOPIC = "/test/1/waku-light-push/utf8"; | ||
|
||
describe("LightPush SDK", () => { | ||
let libp2p: Libp2p; | ||
let encoder: Encoder; | ||
let lightPush: LightPush; | ||
|
||
beforeEach(() => { | ||
libp2p = mockLibp2p(); | ||
encoder = createEncoder({ contentTopic: CONTENT_TOPIC }); | ||
lightPush = mockLightPush({ libp2p }); | ||
}); | ||
|
||
it("should fail to send if pubsub topics are misconfigured", async () => { | ||
lightPush = mockLightPush({ libp2p, pubsubTopics: ["/wrong"] }); | ||
|
||
const result = await lightPush.send(encoder, { | ||
payload: utf8ToBytes("test") | ||
}); | ||
const failures = result.failures ?? []; | ||
|
||
expect(failures.length).to.be.eq(1); | ||
expect(failures.some((v) => v.error === ProtocolError.TOPIC_NOT_CONFIGURED)) | ||
.to.be.true; | ||
}); | ||
|
||
it("should fail to send if no connected peers found", async () => { | ||
const result = await lightPush.send(encoder, { | ||
payload: utf8ToBytes("test") | ||
}); | ||
const failures = result.failures ?? []; | ||
|
||
expect(failures.length).to.be.eq(1); | ||
expect(failures.some((v) => v.error === ProtocolError.NO_PEER_AVAILABLE)).to | ||
.be.true; | ||
}); | ||
|
||
it("should send to specified number of peers of used peers", async () => { | ||
libp2p = mockLibp2p({ | ||
peers: [mockPeer("1"), mockPeer("2"), mockPeer("3"), mockPeer("4")] | ||
}); | ||
|
||
// check default value that should be 2 | ||
lightPush = mockLightPush({ libp2p }); | ||
let sendSpy = sinon.spy( | ||
(_encoder: any, _message: any, peer: Peer) => | ||
({ success: peer.id }) as any | ||
); | ||
lightPush.protocol.send = sendSpy; | ||
|
||
let result = await lightPush.send(encoder, { | ||
payload: utf8ToBytes("test") | ||
}); | ||
|
||
expect(sendSpy.calledTwice).to.be.true; | ||
expect(result.successes?.length).to.be.eq(2); | ||
|
||
// check if setting another value works | ||
lightPush = mockLightPush({ libp2p, numPeersToUse: 3 }); | ||
sendSpy = sinon.spy( | ||
(_encoder: any, _message: any, peer: Peer) => | ||
({ success: peer.id }) as any | ||
); | ||
lightPush.protocol.send = sendSpy; | ||
|
||
result = await lightPush.send(encoder, { payload: utf8ToBytes("test") }); | ||
|
||
expect(sendSpy.calledThrice).to.be.true; | ||
expect(result.successes?.length).to.be.eq(3); | ||
}); | ||
|
||
it("should retry on failure if specified", async () => { | ||
libp2p = mockLibp2p({ | ||
peers: [mockPeer("1"), mockPeer("2")] | ||
}); | ||
|
||
lightPush = mockLightPush({ libp2p }); | ||
let sendSpy = sinon.spy((_encoder: any, _message: any, peer: Peer) => { | ||
if (peer.id.toString() === "1") { | ||
return { success: peer.id }; | ||
} | ||
|
||
return { failure: { error: "problem" } }; | ||
}); | ||
lightPush.protocol.send = sendSpy as any; | ||
const attemptRetriesSpy = sinon.spy(lightPush["attemptRetries"]); | ||
lightPush["attemptRetries"] = attemptRetriesSpy; | ||
|
||
const result = await lightPush.send( | ||
encoder, | ||
{ payload: utf8ToBytes("test") }, | ||
{ autoRetry: true } | ||
); | ||
|
||
expect(attemptRetriesSpy.calledOnce).to.be.true; | ||
expect(result.successes?.length).to.be.eq(1); | ||
expect(result.failures?.length).to.be.eq(1); | ||
|
||
sendSpy = sinon.spy(() => ({ failure: { error: "problem" } })) as any; | ||
await lightPush["attemptRetries"](sendSpy as any); | ||
|
||
expect(sendSpy.callCount).to.be.eq(3); | ||
|
||
sendSpy = sinon.spy(() => ({ failure: { error: "problem" } })) as any; | ||
await lightPush["attemptRetries"](sendSpy as any, 2); | ||
|
||
expect(sendSpy.callCount).to.be.eq(2); | ||
}); | ||
}); | ||
|
||
type MockLibp2pOptions = { | ||
peers?: Peer[]; | ||
}; | ||
|
||
function mockLibp2p(options?: MockLibp2pOptions): Libp2p { | ||
const peers = options?.peers || []; | ||
const peerStore = { | ||
get: (id: any) => Promise.resolve(peers.find((p) => p.id === id)) | ||
}; | ||
|
||
return { | ||
peerStore, | ||
getPeers: () => peers.map((p) => p.id), | ||
components: { | ||
events: new EventTarget(), | ||
connectionManager: { | ||
getConnections: () => [] | ||
} as any, | ||
peerStore | ||
} | ||
} as unknown as Libp2p; | ||
} | ||
|
||
type MockLightPushOptions = { | ||
libp2p: Libp2p; | ||
pubsubTopics?: string[]; | ||
numPeersToUse?: number; | ||
}; | ||
|
||
function mockLightPush(options: MockLightPushOptions): LightPush { | ||
return new LightPush( | ||
{ | ||
configuredPubsubTopics: options.pubsubTopics || [PUBSUB_TOPIC] | ||
} as ConnectionManager, | ||
options.libp2p, | ||
{ numPeersToUse: options.numPeersToUse } | ||
); | ||
} | ||
|
||
function mockPeer(id: string): Peer { | ||
return { | ||
id, | ||
protocols: [LightPushCodec] | ||
} as unknown as Peer; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,53 +6,51 @@ import { | |
LightPushCore | ||
} from "@waku/core"; | ||
import { | ||
type CoreProtocolResult, | ||
Failure, | ||
type IEncoder, | ||
ILightPush, | ||
type IMessage, | ||
type ISenderOptions, | ||
type Libp2p, | ||
type ProtocolCreateOptions, | ||
ProtocolError, | ||
ProtocolUseOptions, | ||
SDKProtocolResult | ||
} from "@waku/interfaces"; | ||
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; | ||
|
||
import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; | ||
import { SenderReliabilityMonitor } from "../../reliability_monitor/sender.js"; | ||
import { BaseProtocolSDK } from "../base_protocol.js"; | ||
import { DEFAULT_NUM_PEERS_TO_USE } from "../base_protocol.js"; | ||
|
||
const log = new Logger("sdk:light-push"); | ||
|
||
class LightPush extends BaseProtocolSDK implements ILightPush { | ||
public readonly protocol: LightPushCore; | ||
const DEFAULT_MAX_ATTEMPTS = 3; | ||
const DEFAULT_SEND_OPTIONS: ISenderOptions = { | ||
autoRetry: false, | ||
maxAttempts: DEFAULT_MAX_ATTEMPTS | ||
}; | ||
|
||
type RetryCallback = (peer: Peer) => Promise<CoreProtocolResult>; | ||
|
||
private readonly reliabilityMonitor: SenderReliabilityMonitor; | ||
export class LightPush implements ILightPush { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
private numPeersToUse: number = DEFAULT_NUM_PEERS_TO_USE; | ||
public readonly protocol: LightPushCore; | ||
|
||
public constructor( | ||
connectionManager: ConnectionManager, | ||
private libp2p: Libp2p, | ||
options?: ProtocolCreateOptions | ||
) { | ||
super( | ||
new LightPushCore(connectionManager.configuredPubsubTopics, libp2p), | ||
connectionManager, | ||
{ | ||
numPeersToUse: options?.numPeersToUse | ||
} | ||
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; | ||
this.protocol = new LightPushCore( | ||
connectionManager.configuredPubsubTopics, | ||
libp2p | ||
); | ||
|
||
this.reliabilityMonitor = ReliabilityMonitorManager.createSenderMonitor( | ||
this.renewPeer.bind(this) | ||
); | ||
|
||
this.protocol = this.core as LightPushCore; | ||
} | ||
|
||
public async send( | ||
encoder: IEncoder, | ||
message: IMessage, | ||
_options?: ProtocolUseOptions | ||
options: ISenderOptions = DEFAULT_SEND_OPTIONS | ||
): Promise<SDKProtocolResult> { | ||
const successes: PeerId[] = []; | ||
const failures: Failure[] = []; | ||
|
@@ -105,14 +103,10 @@ class LightPush extends BaseProtocolSDK implements ILightPush { | |
if (failure) { | ||
failures.push(failure); | ||
|
||
const connectedPeer = this.connectedPeers.find((connectedPeer) => | ||
connectedPeer.id.equals(failure.peerId) | ||
); | ||
|
||
if (connectedPeer) { | ||
void this.reliabilityMonitor.attemptRetriesOrRenew( | ||
connectedPeer.id, | ||
() => this.protocol.send(encoder, message, connectedPeer) | ||
if (options?.autoRetry) { | ||
void this.attemptRetries( | ||
(peer: Peer) => this.protocol.send(encoder, message, peer), | ||
options.maxAttempts | ||
); | ||
} | ||
} | ||
|
@@ -129,6 +123,32 @@ class LightPush extends BaseProtocolSDK implements ILightPush { | |
}; | ||
} | ||
|
||
private async attemptRetries( | ||
fn: RetryCallback, | ||
maxAttempts?: number | ||
): Promise<void> { | ||
maxAttempts = maxAttempts || DEFAULT_MAX_ATTEMPTS; | ||
const connectedPeers = await this.getConnectedPeers(); | ||
|
||
if (connectedPeers.length === 0) { | ||
log.warn("Cannot retry with no connected peers."); | ||
return; | ||
} | ||
|
||
for (let i = 0; i < maxAttempts; i++) { | ||
const peer = connectedPeers[i % connectedPeers.length]; // always present as we checked for the length already | ||
const response = await fn(peer); | ||
|
||
if (response.success) { | ||
return; | ||
} | ||
|
||
log.info( | ||
`Attempted retry for peer:${peer.id} failed with:${response?.failure?.error}` | ||
); | ||
} | ||
} | ||
|
||
private async getConnectedPeers(): Promise<Peer[]> { | ||
const peerIDs = this.libp2p.getPeers(); | ||
|
||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might be a breaking change since
ProtocolUseOptions
had a variable not inISenderOptions
. If someone has code where that optional variable was set, will it break upon updating?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true and no,
ISenderOptions
actually returns previously existed properties and now they are respected in this implementationso the only breaking change is the naming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new interface doesn't have
forceUseAllPeers?: boolean;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
js-waku/packages/interfaces/src/protocols.ts
Line 41 in b2efce5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good spot, afaik it wasn't used outside some of our tests