Skip to content

Commit

Permalink
P2P announce segments on http requests state change.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Nov 16, 2023
1 parent 3b4dbdd commit 5d5865b
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 26 deletions.
3 changes: 2 additions & 1 deletion packages/p2p-media-loader-core/src/event-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export class EventDispatcher<
T extends { [key: string]: (...args: any) => any },
// eslint-disable-next-line @typescript-eslint/no-explicit-any
T extends { [key: string]: (...args: any) => void | Promise<void> },
K extends keyof T = keyof T
> {
private readonly listeners = new Map<keyof T, Set<T[K]>>();
Expand Down
23 changes: 18 additions & 5 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ export class HybridLoader {
private lastQueueProcessingTimeStamp?: number;
private readonly segmentAvgDuration: number;
private randomHttpDownloadInterval!: number;
private readonly logger: { engine: debug.Debugger; loader: debug.Debugger };
private readonly logger: {
engine: debug.Debugger;
loader: debug.Debugger;
};
private readonly levelBandwidth = { value: 0, refreshCount: 0 };
private isProcessQueueMicrotaskCreated = false;

Expand Down Expand Up @@ -212,6 +215,7 @@ export class HybridLoader {
request.subscribe("onSuccess", this.onRequestSuccess);
request.subscribe("onError", this.onRequestError);

this.p2pLoaders.currentLoader.broadcastAnnouncement();
void fulfillHttpSegmentRequest(request, this.settings);
if (!isRandom) {
this.logger.loader(
Expand All @@ -230,13 +234,22 @@ export class HybridLoader {
}

private onRequestSuccess = (request: Request, data: ArrayBuffer) => {
const { segment } = request;
this.logger.loader(`http responses: ${segment.externalId}`);
this.eventHandlers?.onSegmentLoaded?.(data.byteLength, "http");
const requestType = request.type;
if (!requestType) return;

if (requestType === "http") {
this.logger.loader(`http responses: ${request.segment.externalId}`);
this.p2pLoaders.currentLoader.broadcastAnnouncement();
}

this.eventHandlers?.onSegmentLoaded?.(data.byteLength, requestType);
this.createProcessQueueMicrotask();
};

private onRequestError = () => {
private onRequestError = (request: Request) => {
if (request.type === "http") {
this.p2pLoaders.currentLoader.broadcastAnnouncement();
}
this.createProcessQueueMicrotask();
};

Expand Down
7 changes: 3 additions & 4 deletions packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ export class P2PLoader {
this.stream,
this.broadcastAnnouncement
);
// this.requests.subscribeOnHttpRequestsUpdate(this.broadcastAnnouncement);
this.trackerClient.start();
}

Expand Down Expand Up @@ -103,6 +102,7 @@ export class P2PLoader {

get connectedPeersAmount() {
let count = 0;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for (const peer of this.trackerClient.peers()) count++;
return count;
}
Expand All @@ -127,7 +127,7 @@ export class P2PLoader {
peer.sendSegmentsAnnouncement(announcement);
}

private broadcastAnnouncement = () => {
broadcastAnnouncement() {
if (this.isAnnounceMicrotaskCreated) return;

this.isAnnounceMicrotaskCreated = true;
Expand All @@ -138,7 +138,7 @@ export class P2PLoader {
}
this.isAnnounceMicrotaskCreated = false;
});
};
}

private async onSegmentRequested(peer: Peer, segmentExternalId: string) {
const segment = StreamUtils.getSegmentFromStreamByExternalId(
Expand All @@ -159,7 +159,6 @@ export class P2PLoader {
this.stream,
this.broadcastAnnouncement
);
// this.requests.unsubscribeFromHttpRequestsUpdate(this.broadcastAnnouncement);
this.trackerClient.destroy();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ export class P2PLoadersContainer {
}

private setLoaderDestroyTimeout(item: P2PLoaderContainerItem) {
// TODO: use Timeout class instead
item.destroyTimeoutId = window.setTimeout(
() => this.destroyAndRemoveLoader(item),
this.settings.p2pLoaderDestroyTimeoutMs
Expand Down
17 changes: 2 additions & 15 deletions packages/p2p-media-loader-core/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ export class Request extends EventDispatcher<RequestEvents> {
const data = Utils.joinChunks(this.chunks);
this._status = "succeed";
this.prevAttempts.push(this.currentAttempt);
this.currentAttempt = undefined;

this._engineCallbacks?.onSuccess({
data,
Expand Down Expand Up @@ -225,7 +224,6 @@ export class Request extends EventDispatcher<RequestEvents> {
}
this.currentAttempt.error = error;
this.prevAttempts.push(this.currentAttempt);
this.currentAttempt = undefined;
this.dispatch("onError", this, error);
}

Expand Down Expand Up @@ -283,20 +281,9 @@ export class RequestError<
static isRequestInnerErrorType(
error: RequestError
): error is RequestError<RequestInnerErrorType> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return requestInnerErrorTypes.includes(error.type as any);
}

static isPeerErrorType(
error: RequestError
): error is RequestError<PeerRequestErrorType> {
return peerRequestErrorTypes.includes(error.type as any);
}

static isHttpErrorType(
error: RequestError
): error is RequestError<HttpRequestErrorType> {
return peerRequestErrorTypes.includes(error.type as any);
}
}

export class CoreRequestError extends Error {
Expand All @@ -305,7 +292,7 @@ export class CoreRequestError extends Error {
}
}

class Timeout {
export class Timeout {
private timeoutId?: number;

constructor(private readonly action: () => void) {}
Expand Down

0 comments on commit 5d5865b

Please sign in to comment.