From 4a9bae97ddb68824d126930bf8857b827872a536 Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Thu, 21 Dec 2023 20:15:15 +0200 Subject: [PATCH 01/11] Add ability to resume segment downloading starting from certain byte. --- p2p-media-loader-demo/index.html | 2 +- p2p-media-loader-demo/src/App.tsx | 2 + .../p2p-media-loader-core/src/http-loader.ts | 88 +++++++++++++-- .../src/p2p/peer-base.ts | 76 +++++++------ .../p2p-media-loader-core/src/p2p/peer.ts | 106 +++++++++++------- packages/p2p-media-loader-core/src/request.ts | 21 +++- .../src/stream-utils.ts | 1 + 7 files changed, 205 insertions(+), 91 deletions(-) diff --git a/p2p-media-loader-demo/index.html b/p2p-media-loader-demo/index.html index fccca772..4f96d729 100644 --- a/p2p-media-loader-demo/index.html +++ b/p2p-media-loader-demo/index.html @@ -6,7 +6,7 @@ Vite + React + TS - + diff --git a/p2p-media-loader-demo/src/App.tsx b/p2p-media-loader-demo/src/App.tsx index 5603ce7b..43160052 100644 --- a/p2p-media-loader-demo/src/App.tsx +++ b/p2p-media-loader-demo/src/App.tsx @@ -23,6 +23,8 @@ const streamUrls = { hlsBigBunnyBuck: "https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8", hlsByteRangeVideo: "https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/bipbop_16x9_variant.m3u8", + hlsOneLevelByteRangeVideo: + "https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/gear1/prog_index.m3u8", hlsAdvancedVideo: "https://devstreaming-cdn.apple.com/videos/streaming/examples/adv_dv_atmos/main.m3u8", hlsAdvancedVideo2: diff --git a/packages/p2p-media-loader-core/src/http-loader.ts b/packages/p2p-media-loader-core/src/http-loader.ts index d81d9d3b..722b83ae 100644 --- a/packages/p2p-media-loader-core/src/http-loader.ts +++ b/packages/p2p-media-loader-core/src/http-loader.ts @@ -5,14 +5,17 @@ export async function fulfillHttpSegmentRequest( request: Request, settings: Pick ) { - const headers = new Headers(); - const { segment } = request; + const requestHeaders = new Headers(); + const { segment, loadedBytes: alreadyLoadedBytes } = request; const { url, byteRange } = segment; - if (byteRange) { - const { start, end } = byteRange; - const byteRangeString = `bytes=${start}-${end}`; - headers.set("Range", byteRangeString); + let byteFrom = byteRange?.start; + const byteTo = byteRange?.end; + if (alreadyLoadedBytes !== 0) byteFrom = (byteFrom ?? 0) + alreadyLoadedBytes; + + if (byteFrom !== undefined) { + const byteRangeString = `bytes=${byteFrom}-${byteTo ?? ""}`; + requestHeaders.set("Range", byteRangeString); } const abortController = new AbortController(); @@ -25,18 +28,27 @@ export async function fulfillHttpSegmentRequest( ); try { const fetchResponse = await window.fetch(url, { - headers, + headers: requestHeaders, signal: abortController.signal, }); - requestControls.firstBytesReceived(); - if (!fetchResponse.ok) { throw new RequestError("fetch-error", fetchResponse.statusText); } - if (!fetchResponse.body) return; - const totalBytesString = fetchResponse.headers.get("Content-Length"); - if (totalBytesString) request.setTotalBytes(+totalBytesString); + requestControls.firstBytesReceived(); + + if ( + byteFrom !== undefined && + (fetchResponse.status !== 206 || + !isResponseWithRequestedContentRange(fetchResponse, byteFrom, byteTo)) + ) { + request.clearLoadedBytes(); + } + + if (request.totalBytes === undefined) { + const totalBytesString = fetchResponse.headers.get("Content-Length"); + if (totalBytesString) request.setTotalBytes(+totalBytesString); + } const reader = fetchResponse.body.getReader(); for await (const chunk of readStream(reader)) { @@ -66,3 +78,55 @@ async function* readStream( yield value; } } + +function getValueFromContentRangeHeader(headerValue: string) { + const match = headerValue + .trim() + .match(/^bytes (?:(?:(\d+)|)-(?:(\d+)|)|\*)\/(?:(\d+)|\*)$/); + if (!match) return; + + const [, from, to, total] = match; + return { + from: from ? parseInt(from) : undefined, + to: to ? parseInt(to) : undefined, + total: total ? parseInt(total) : undefined, + }; +} + +function isResponseWithRequestedContentRange( + response: Response, + requestedFromByte: number, + requestedToByte?: number +): boolean { + const requestedBytesAmount = + requestedToByte !== undefined + ? requestedToByte - requestedFromByte + 1 + : undefined; + + const { headers } = response; + const contentLengthHeader = headers.get("Content-Length"); + const contentLength = contentLengthHeader && parseInt(contentLengthHeader); + + if ( + contentLength && + requestedBytesAmount !== undefined && + requestedBytesAmount !== contentLength + ) { + return false; + } + + const contentRangeHeader = headers.get("Content-Range"); + const contentRange = + contentRangeHeader && getValueFromContentRangeHeader(contentRangeHeader); + if (!contentRange) return true; + const { from, to } = contentRange; + if (from !== requestedFromByte) return false; + if ( + to !== undefined && + requestedToByte !== undefined && + to !== requestedToByte + ) { + return false; + } + return true; +} diff --git a/packages/p2p-media-loader-core/src/p2p/peer-base.ts b/packages/p2p-media-loader-core/src/p2p/peer-base.ts index 202d4912..2ebf2498 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer-base.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer-base.ts @@ -1,7 +1,6 @@ import { PeerConnection } from "bittorrent-tracker"; import * as Command from "./commands"; import * as Utils from "../utils/utils"; -import debug from "debug"; import { Settings } from "../types"; export type PeerSettings = Pick< @@ -9,38 +8,44 @@ export type PeerSettings = Pick< "p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize" >; -export abstract class PeerBase { - readonly id: string; - private isUploadingSegment = false; +export class PeerInterface { private commandChunks?: Command.BinaryCommandChunksJoiner; - protected readonly logger = debug("core:peer"); + private uploadingContext?: { stopUploading: () => void }; - protected constructor( + constructor( private readonly connection: PeerConnection, - protected readonly settings: PeerSettings + private readonly settings: PeerSettings, + private readonly eventHandlers: { + onCommandReceived: (command: Command.PeerCommand) => void; + onSegmentChunkReceived: (data: Uint8Array) => void; + onDestroy: () => void; + } ) { - this.id = PeerBase.getPeerIdFromConnection(connection); connection.on("data", this.onDataReceived); connection.on("close", this.onPeerClosed); connection.on("error", this.onConnectionError); } private onDataReceived = (data: Uint8Array) => { - if (Command.isCommandChunk(data)) this.receivingCommandBytes(data); - else this.receiveSegmentChunk(data); + if (Command.isCommandChunk(data)) { + this.receivingCommandBytes(data); + } else { + this.eventHandlers.onSegmentChunkReceived(data); + } }; private onPeerClosed = () => { - this.logger(`connection with peer closed: ${this.id}`); this.destroy(); }; private onConnectionError = (error: { code: string }) => { - this.logger(`peer error: ${this.id} ${error.code}`); - this.destroy(); + if (error.code === "ERR_DATA_CHANNEL") { + this.destroy(); + this.eventHandlers.onDestroy(); + } }; - protected sendCommand(command: Command.PeerCommand) { + sendCommand(command: Command.PeerCommand) { const binaryCommandBuffers = Command.serializePeerCommand( command, this.settings.webRtcMaxMessageSize @@ -50,11 +55,26 @@ export abstract class PeerBase { } } - protected async splitDataToChunksAndUploadAsync(data: Uint8Array) { + stopUploadingSegmentData() { + this.uploadingContext?.stopUploading(); + this.uploadingContext = undefined; + } + + async splitSegmentDataToChunksAndUploadAsync(data: Uint8Array) { + if (this.uploadingContext) { + throw new Error(`Some segment data is already uploading.`); + } const chunks = getBufferChunks(data, this.settings.webRtcMaxMessageSize); const channel = this.connection._channel; const { promise, resolve, reject } = Utils.getControlledPromise(); + let isUploadingSegmentData = false; + this.uploadingContext = { + stopUploading: () => { + isUploadingSegmentData = false; + }, + }; + const sendChunk = () => { while (channel.bufferedAmount <= channel.bufferedAmountLowThreshold) { const chunk = chunks.next().value; @@ -62,7 +82,7 @@ export abstract class PeerBase { resolve(); break; } - if (chunk && !this.isUploadingSegment) { + if (chunk && !isUploadingSegmentData) { reject(); break; } @@ -71,26 +91,23 @@ export abstract class PeerBase { }; try { channel.addEventListener("bufferedamountlow", sendChunk); - this.isUploadingSegment = true; + isUploadingSegmentData = true; sendChunk(); await promise; return promise; } finally { - this.isUploadingSegment = false; + channel.removeEventListener("bufferedamountlow", sendChunk); + this.uploadingContext = undefined; } } - protected cancelDataUploading() { - this.isUploadingSegment = false; - } - private receivingCommandBytes(buffer: Uint8Array) { if (!this.commandChunks) { this.commandChunks = new Command.BinaryCommandChunksJoiner( (commandBuffer) => { this.commandChunks = undefined; const command = Command.deserializeCommand(commandBuffer); - this.receiveCommand(command); + this.eventHandlers.onCommandReceived(command); } ); } @@ -102,23 +119,16 @@ export abstract class PeerBase { } } - protected abstract receiveCommand(command: Command.PeerCommand): void; - - protected abstract receiveSegmentChunk(data: Uint8Array): void; - - protected destroy() { + destroy() { this.connection.destroy(); - } - - static getPeerIdFromConnection(connection: PeerConnection) { - return Utils.hexToUtf8(connection.id); + this.eventHandlers.onDestroy(); } } function* getBufferChunks( data: ArrayBuffer, maxChunkSize: number -): Generator { +): Generator { let bytesLeft = data.byteLength; while (bytesLeft > 0) { const bytesToSend = bytesLeft >= maxChunkSize ? maxChunkSize : bytesLeft; diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index 98f14cd9..ac22edd2 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -1,5 +1,5 @@ import { PeerConnection } from "bittorrent-tracker"; -import { PeerBase, PeerSettings } from "./peer-base"; +import { PeerInterface, PeerSettings } from "./peer-base"; import { Request, RequestControls, @@ -8,6 +8,8 @@ import { } from "../request"; import * as Command from "./commands"; import { Segment } from "../types"; +import * as Utils from "../utils/utils"; +import debug from "debug"; const { PeerCommandType } = Command; type PeerEventHandlers = { @@ -19,21 +21,33 @@ type PeerEventHandlers = { ) => void; }; -export class Peer extends PeerBase { - private requestContext?: { request: Request; controls: RequestControls }; +export class Peer { + private readonly id: string; + private readonly peerInterface; + private downloadingContext?: { + request: Request; + controls: RequestControls; + isSegmentDataCommandReceived: boolean; + }; private loadedSegments = new Set(); private httpLoadingSegments = new Set(); + private logger = debug("core:peer"); constructor( connection: PeerConnection, private readonly eventHandlers: PeerEventHandlers, - settings: PeerSettings + private readonly settings: PeerSettings ) { - super(connection, settings); + this.id = Peer.getPeerIdFromConnection(connection); + this.peerInterface = new PeerInterface(connection, settings, { + onSegmentChunkReceived: this.onSegmentChunkReceived, + onCommandReceived: this.onCommandReceived, + onDestroy: this.destroy, + }); } get downloadingSegment(): Segment | undefined { - return this.requestContext?.request.segment; + return this.downloadingContext?.request.segment; } getSegmentStatus(segment: Segment): "loaded" | "http-loading" | undefined { @@ -42,7 +56,7 @@ export class Peer extends PeerBase { if (this.httpLoadingSegments.has(externalId)) return "http-loading"; } - protected receiveCommand(command: Command.PeerCommand) { + private onCommandReceived = (command: Command.PeerCommand) => { switch (command.c) { case PeerCommandType.SegmentsAnnouncement: this.loadedSegments = new Set(command.l); @@ -50,15 +64,18 @@ export class Peer extends PeerBase { break; case PeerCommandType.SegmentRequest: + this.peerInterface.stopUploadingSegmentData(); this.eventHandlers.onSegmentRequested(this, command.i, command.b); break; case PeerCommandType.SegmentData: { - const request = this.requestContext?.request; - this.requestContext?.controls.firstBytesReceived(); + if (!this.downloadingContext) break; + this.downloadingContext.isSegmentDataCommandReceived = true; + const { request, controls } = this.downloadingContext; + controls.firstBytesReceived(); if ( - request?.segment.externalId === command.i && + request.segment.externalId === command.i && request.totalBytes === undefined ) { request.setTotalBytes(command.s); @@ -67,40 +84,39 @@ export class Peer extends PeerBase { break; case PeerCommandType.SegmentAbsent: - if (this.requestContext?.request.segment.externalId === command.i) { + if (this.downloadingContext?.request.segment.externalId === command.i) { this.cancelSegmentDownloading("peer-segment-absent"); this.loadedSegments.delete(command.i); } break; case PeerCommandType.CancelSegmentRequest: - this.cancelDataUploading(); + this.peerInterface.stopUploadingSegmentData(); break; } - } + }; - protected receiveSegmentChunk(chunk: Uint8Array): void { - if (!this.requestContext) return; - const { request, controls } = this.requestContext; + protected onSegmentChunkReceived = (chunk: Uint8Array) => { + if (!this.downloadingContext?.isSegmentDataCommandReceived) return; + const { request, controls } = this.downloadingContext; controls.addLoadedChunk(chunk); + if (request.totalBytes === undefined) return; if (request.loadedBytes === request.totalBytes) { controls.completeOnSuccess(); - this.requestContext = undefined; - } else if ( - request.totalBytes !== undefined && - request.loadedBytes > request.totalBytes - ) { + this.downloadingContext = undefined; + } else if (request.loadedBytes > request.totalBytes) { this.cancelSegmentDownloading("peer-response-bytes-mismatch"); } - } + }; downloadSegment(segmentRequest: Request) { - if (this.requestContext) { - throw new Error("Segment already is downloading"); + if (this.downloadingContext) { + throw new Error("Some segment already is downloading"); } - this.requestContext = { + this.downloadingContext = { request: segmentRequest, + isSegmentDataCommandReceived: false, controls: segmentRequest.start( { type: "p2p", peerId: this.id }, { @@ -115,14 +131,14 @@ export class Peer extends PeerBase { i: segmentRequest.segment.externalId, }; if (segmentRequest.loadedBytes) command.b = segmentRequest.loadedBytes; - this.sendCommand(command); + this.peerInterface.sendCommand(command); } private abortSegmentDownloading = () => { - if (!this.requestContext) return; - const { request } = this.requestContext; + if (!this.downloadingContext) return; + const { request } = this.downloadingContext; this.sendCancelSegmentRequestCommand(request.segment); - this.requestContext = undefined; + this.downloadingContext = undefined; }; async uploadSegmentData(segmentExternalId: number, data: ArrayBuffer) { @@ -132,9 +148,11 @@ export class Peer extends PeerBase { i: segmentExternalId, s: data.byteLength, }; - this.sendCommand(command); + this.peerInterface.sendCommand(command); try { - await this.splitDataToChunksAndUploadAsync(data as Uint8Array); + await this.peerInterface.splitSegmentDataToChunksAndUploadAsync( + data as Uint8Array + ); this.logger(`segment ${segmentExternalId} has been sent to ${this.id}`); } catch (err) { this.logger(`cancel segment uploading ${segmentExternalId}`); @@ -142,16 +160,16 @@ export class Peer extends PeerBase { } private cancelSegmentDownloading(type: PeerRequestErrorType) { - if (!this.requestContext) return; - const { request, controls } = this.requestContext; - const { segment } = request; - this.logger(`cancel segment request ${segment.externalId} (${type})`); - const error = new RequestError(type); + if (!this.downloadingContext) return; + const { request, controls } = this.downloadingContext; if (type === "peer-response-bytes-mismatch") { this.sendCancelSegmentRequestCommand(request.segment); } + const { segment } = request; + this.logger(`cancel segment request ${segment.externalId} (${type})`); + const error = new RequestError(type); controls.abortOnError(error); - this.requestContext = undefined; + this.downloadingContext = undefined; } sendSegmentsAnnouncementCommand( @@ -163,26 +181,30 @@ export class Peer extends PeerBase { p: httpLoadingSegmentsIds, l: loadedSegmentsIds, }; - this.sendCommand(command); + this.peerInterface.sendCommand(command); } sendSegmentAbsentCommand(segmentExternalId: number) { - this.sendCommand({ + this.peerInterface.sendCommand({ c: PeerCommandType.SegmentAbsent, i: segmentExternalId, }); } private sendCancelSegmentRequestCommand(segment: Segment) { - this.sendCommand({ + this.peerInterface.sendCommand({ c: PeerCommandType.CancelSegmentRequest, i: segment.externalId, }); } - destroy() { - super.destroy(); + destroy = () => { + this.peerInterface.destroy(); this.cancelSegmentDownloading("peer-closed"); this.eventHandlers.onPeerClosed(this); + }; + + static getPeerIdFromConnection(connection: PeerConnection) { + return Utils.hexToUtf8(connection.id); } } diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts index a36cbef9..a63f7eae 100644 --- a/packages/p2p-media-loader-core/src/request.ts +++ b/packages/p2p-media-loader-core/src/request.ts @@ -71,13 +71,24 @@ export class Request { private readonly playback: Playback, private readonly settings: StreamUtils.PlaybackTimeWindowsSettings ) { - this.id = Request.getRequestItemId(segment); + this.id = Request.getRequestItemId(this.segment); + const { byteRange } = this.segment; + if (byteRange) { + const { end, start } = byteRange; + this._totalBytes = end - start + 1; + } this.notReceivingBytesTimeout = new Timeout(this.abortOnTimeout); const { type } = this.segment.stream; this._logger = debug(`core:request-${type}`); } + clearLoadedBytes() { + this._loadedBytes = 0; + this.bytes = []; + this._totalBytes = undefined; + } + get status() { return this._status; } @@ -136,10 +147,14 @@ export class Request { } ): RequestControls { if (this._status === "succeed") { - throw new Error("Request has been already succeed."); + throw new Error( + `Request ${this.segment.externalId} has been already succeed.` + ); } if (this._status === "loading") { - throw new Error("Request has been already started."); + throw new Error( + `Request ${this.segment.externalId} has been already started.` + ); } this._status = "loading"; diff --git a/packages/p2p-media-loader-shaka/src/stream-utils.ts b/packages/p2p-media-loader-shaka/src/stream-utils.ts index a0a955a1..3b45b099 100644 --- a/packages/p2p-media-loader-shaka/src/stream-utils.ts +++ b/packages/p2p-media-loader-shaka/src/stream-utils.ts @@ -19,6 +19,7 @@ export function createSegment({ return { localId: localId ?? getSegmentLocalId(url, byteRange), externalId, + byteRange, url, startTime, endTime, From a0f64efe010c07028935f85da79f181072a99774 Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Thu, 21 Dec 2023 20:17:02 +0200 Subject: [PATCH 02/11] Rename file. --- .../src/p2p/{peer-base.ts => peer-interface.ts} | 0 packages/p2p-media-loader-core/src/p2p/peer.ts | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename packages/p2p-media-loader-core/src/p2p/{peer-base.ts => peer-interface.ts} (100%) diff --git a/packages/p2p-media-loader-core/src/p2p/peer-base.ts b/packages/p2p-media-loader-core/src/p2p/peer-interface.ts similarity index 100% rename from packages/p2p-media-loader-core/src/p2p/peer-base.ts rename to packages/p2p-media-loader-core/src/p2p/peer-interface.ts diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index ac22edd2..b2369f92 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -1,5 +1,5 @@ import { PeerConnection } from "bittorrent-tracker"; -import { PeerInterface, PeerSettings } from "./peer-base"; +import { PeerInterface, PeerSettings } from "./peer-interface"; import { Request, RequestControls, From d66505335dbd0ac7bdcbd8e4b407da4605b2d1c6 Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Thu, 21 Dec 2023 20:54:33 +0200 Subject: [PATCH 03/11] Add peer destroy logger. Fix type error. --- packages/p2p-media-loader-core/src/p2p/peer.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index b2369f92..7209b19e 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -22,7 +22,7 @@ type PeerEventHandlers = { }; export class Peer { - private readonly id: string; + readonly id: string; private readonly peerInterface; private downloadingContext?: { request: Request; @@ -201,6 +201,7 @@ export class Peer { destroy = () => { this.peerInterface.destroy(); this.cancelSegmentDownloading("peer-closed"); + this.logger(`peer closed ${this.id}`); this.eventHandlers.onPeerClosed(this); }; From 4ec4ee22b39e9536428051ea8a6d355efca26dc4 Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Sun, 24 Dec 2023 13:50:03 +0200 Subject: [PATCH 04/11] Add errors handling. --- packages/p2p-media-loader-core/src/core.ts | 2 + .../src/hybrid-loader.ts | 125 ++++++++++++------ .../src/p2p/peer-interface.ts | 6 +- .../p2p-media-loader-core/src/p2p/peer.ts | 32 +++-- .../src/request-container.ts | 1 + packages/p2p-media-loader-core/src/request.ts | 95 +++++++++---- packages/p2p-media-loader-core/src/types.d.ts | 2 + 7 files changed, 179 insertions(+), 84 deletions(-) diff --git a/packages/p2p-media-loader-core/src/core.ts b/packages/p2p-media-loader-core/src/core.ts index 3aab237c..d03b9700 100644 --- a/packages/p2p-media-loader-core/src/core.ts +++ b/packages/p2p-media-loader-core/src/core.ts @@ -28,6 +28,8 @@ export class Core { p2pNotReceivingBytesTimeoutMs: 1000, p2pLoaderDestroyTimeoutMs: 30 * 1000, httpNotReceivingBytesTimeoutMs: 1000, + maxHttpFailedDownloadAttempts: 3, + maxPeerNotReceivingBytesTimeoutErrors: 3, }; private readonly bandwidthApproximator = new BandwidthApproximator(); private segmentStorage?: SegmentsMemoryStorage; diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index ff2edc58..19af5f53 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -97,9 +97,8 @@ export class HybridLoader { } } else { const request = this.requests.getOrCreateRequest(segment); - request.setOrResolveEngineCallbacks(callbacks); + request.setEngineCallbacks(callbacks); } - this.requestProcessQueueMicrotask(); } @@ -125,6 +124,73 @@ export class HybridLoader { }); }; + private processRequests(queueSegmentIds: Set) { + const { stream } = this.lastRequestedSegment; + const { maxHttpFailedDownloadAttempts } = this.settings; + for (const request of this.requests.items()) { + const { + type, + status, + segment, + isCheckedByProcessQueue, + isSegmentRequestedByEngine, + } = request; + + if (!type) continue; + + switch (status) { + case "loading": + if ( + !isSegmentRequestedByEngine && + !queueSegmentIds.has(segment.localId) + ) { + request.abortFromProcessQueue(); + this.requests.remove(request); + } + break; + + case "succeed": + if (!request.data) break; + if (type === "http") { + this.p2pLoaders.currentLoader.broadcastAnnouncement(); + } + request.resolveEngineCallbacksSuccessfully(); + void this.segmentStorage.storeSegment(request.segment, request.data); + this.eventHandlers?.onSegmentLoaded?.(request.data.byteLength, type); + this.requests.remove(request); + break; + + case "failed": + if (type === "http" && !isCheckedByProcessQueue) { + this.p2pLoaders.currentLoader.broadcastAnnouncement(); + } + if ( + !isSegmentRequestedByEngine && + !stream.segments.has(request.segment.localId) + ) { + this.requests.remove(request); + } + if ( + request.failedAttempts.httpAttemptsCount >= + maxHttpFailedDownloadAttempts && + isSegmentRequestedByEngine + ) { + request.resolveEngineCallbacksWithError(); + } + break; + + case "not-started": + this.requests.remove(request); + break; + + case "aborted": + this.requests.remove(request); + break; + } + request.markCheckedByProcessQueue(); + } + } + private processQueue() { const { queue, queueSegmentIds } = QueueUtils.generateQueue({ lastRequestedSegment: this.lastRequestedSegment, @@ -137,48 +203,13 @@ export class HybridLoader { ); }, }); + this.processRequests(queueSegmentIds); - for (const request of this.requests.items()) { - if (request.status === "loading") { - if ( - !request.isSegmentRequestedByEngine && - !queueSegmentIds.has(request.segment.localId) - ) { - request.abortFromProcessQueue(); - this.requests.remove(request); - } - continue; - } - - if (request.status === "succeed") { - const { type, data } = request; - if (!type || !data) continue; - if (type === "http") { - this.p2pLoaders.currentLoader.broadcastAnnouncement(); - } - void this.segmentStorage.storeSegment(request.segment, data); - this.eventHandlers?.onSegmentLoaded?.(data.byteLength, type); - this.requests.remove(request); - continue; - } - - if (request.status === "failed") { - if (request.type === "http") { - this.p2pLoaders.currentLoader.broadcastAnnouncement(); - } - continue; - } - - if ( - (request.status === "not-started" || request.status === "aborted") && - !request.isSegmentRequestedByEngine - ) { - this.requests.remove(request); - } - } - - const { simultaneousHttpDownloads, simultaneousP2PDownloads } = - this.settings; + const { + simultaneousHttpDownloads, + simultaneousP2PDownloads, + maxHttpFailedDownloadAttempts, + } = this.settings; for (const item of queue) { const { statuses, segment } = item; @@ -186,6 +217,14 @@ export class HybridLoader { if (statuses.isHighDemand) { if (request?.type === "http" && request.status === "loading") continue; + if ( + request?.type === "http" && + request.status === "failed" && + request.failedAttempts.httpAttemptsCount >= + maxHttpFailedDownloadAttempts + ) { + break; + } if (this.requests.executingHttpCount < simultaneousHttpDownloads) { void this.loadThroughHttp(segment); diff --git a/packages/p2p-media-loader-core/src/p2p/peer-interface.ts b/packages/p2p-media-loader-core/src/p2p/peer-interface.ts index 2ebf2498..12a136b6 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer-interface.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer-interface.ts @@ -5,7 +5,9 @@ import { Settings } from "../types"; export type PeerSettings = Pick< Settings, - "p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize" + | "p2pNotReceivingBytesTimeoutMs" + | "webRtcMaxMessageSize" + | "maxPeerNotReceivingBytesTimeoutErrors" >; export class PeerInterface { @@ -36,6 +38,7 @@ export class PeerInterface { private onPeerClosed = () => { this.destroy(); + this.eventHandlers.onDestroy(); }; private onConnectionError = (error: { code: string }) => { @@ -121,7 +124,6 @@ export class PeerInterface { destroy() { this.connection.destroy(); - this.eventHandlers.onDestroy(); } } diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index 7209b19e..cb9448bf 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -5,6 +5,7 @@ import { RequestControls, RequestError, PeerRequestErrorType, + RequestInnerErrorType, } from "../request"; import * as Command from "./commands"; import { Segment } from "../types"; @@ -31,6 +32,9 @@ export class Peer { }; private loadedSegments = new Set(); private httpLoadingSegments = new Set(); + private downloadingErrors: RequestError< + PeerRequestErrorType | RequestInnerErrorType + >[] = []; private logger = debug("core:peer"); constructor( @@ -106,6 +110,7 @@ export class Peer { controls.completeOnSuccess(); this.downloadingContext = undefined; } else if (request.loadedBytes > request.totalBytes) { + request.clearLoadedBytes(); this.cancelSegmentDownloading("peer-response-bytes-mismatch"); } }; @@ -120,9 +125,23 @@ export class Peer { controls: segmentRequest.start( { type: "p2p", peerId: this.id }, { - abort: this.abortSegmentDownloading, notReceivingBytesTimeoutMs: this.settings.p2pNotReceivingBytesTimeoutMs, + abort: (error) => { + if (!this.downloadingContext) return; + const { request } = this.downloadingContext; + this.sendCancelSegmentRequestCommand(request.segment); + this.downloadingContext = undefined; + this.downloadingErrors.push(error); + + const timeoutErrors = this.downloadingErrors.filter( + (error) => error.type === "bytes-receiving-timeout" + ); + const { maxPeerNotReceivingBytesTimeoutErrors } = this.settings; + if (timeoutErrors.length >= maxPeerNotReceivingBytesTimeoutErrors) { + this.peerInterface.destroy(); + } + }, } ), }; @@ -134,13 +153,6 @@ export class Peer { this.peerInterface.sendCommand(command); } - private abortSegmentDownloading = () => { - if (!this.downloadingContext) return; - const { request } = this.downloadingContext; - this.sendCancelSegmentRequestCommand(request.segment); - this.downloadingContext = undefined; - }; - async uploadSegmentData(segmentExternalId: number, data: ArrayBuffer) { this.logger(`send segment ${segmentExternalId} to ${this.id}`); const command: Command.PeerSendSegmentCommand = { @@ -170,6 +182,7 @@ export class Peer { const error = new RequestError(type); controls.abortOnError(error); this.downloadingContext = undefined; + this.downloadingErrors.push(error); } sendSegmentsAnnouncementCommand( @@ -199,10 +212,9 @@ export class Peer { } destroy = () => { - this.peerInterface.destroy(); this.cancelSegmentDownloading("peer-closed"); - this.logger(`peer closed ${this.id}`); this.eventHandlers.onPeerClosed(this); + this.logger(`peer closed ${this.id}`); }; static getPeerIdFromConnection(connection: PeerConnection) { diff --git a/packages/p2p-media-loader-core/src/request-container.ts b/packages/p2p-media-loader-core/src/request-container.ts index f5f56667..14d332e0 100644 --- a/packages/p2p-media-loader-core/src/request-container.ts +++ b/packages/p2p-media-loader-core/src/request-container.ts @@ -72,6 +72,7 @@ export class RequestsContainer { if (request.type === "p2p") yield request; } } + isHybridLoaderRequested(segment: Segment): boolean { return !!this.requests.get(segment)?.type; } diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts index a63f7eae..f914f705 100644 --- a/packages/p2p-media-loader-core/src/request.ts +++ b/packages/p2p-media-loader-core/src/request.ts @@ -53,7 +53,7 @@ export class Request { readonly id: string; private _engineCallbacks?: EngineCallbacks; private currentAttempt?: RequestAttempt; - private _failedAttempts: RequestAttempt[] = []; + private _failedAttempts = new FailedRequestAttempts(); private finalData?: ArrayBuffer; private bytes: Uint8Array[] = []; private _loadedBytes = 0; @@ -61,8 +61,11 @@ export class Request { private _status: RequestStatus = "not-started"; private progress?: LoadProgress; private notReceivingBytesTimeout: Timeout; - private _abortRequestCallback?: (errorType: RequestInnerErrorType) => void; + private _abortRequestCallback?: ( + error: RequestError + ) => void; private readonly _logger: debug.Debugger; + private _isCheckedByProcessQueue = false; constructor( readonly segment: Segment, @@ -93,6 +96,11 @@ export class Request { return this._status; } + private setStatus(status: RequestStatus) { + this._status = status; + this._isCheckedByProcessQueue = false; + } + get isSegmentRequestedByEngine(): boolean { return !!this._engineCallbacks; } @@ -115,21 +123,24 @@ export class Request { return this.finalData; } - get loadedPercent() { - if (!this._totalBytes) return; - return Utils.getPercent(this.loadedBytes, this._totalBytes); + get failedAttempts() { + return this._failedAttempts; + } + + get isCheckedByProcessQueue() { + return this._isCheckedByProcessQueue; } - get failedAttempts(): ReadonlyArray> { - return this._failedAttempts; + markCheckedByProcessQueue() { + this._isCheckedByProcessQueue = true; } - setOrResolveEngineCallbacks(callbacks: EngineCallbacks) { + setEngineCallbacks(callbacks: EngineCallbacks) { if (this._engineCallbacks) { throw new Error("Segment is already requested by engine"); } + this._isCheckedByProcessQueue = false; this._engineCallbacks = callbacks; - if (this.finalData) this.resolveEngineCallbacksSuccessfully(this.finalData); } setTotalBytes(value: number) { @@ -143,7 +154,7 @@ export class Request { requestData: StartRequestParameters, controls: { notReceivingBytesTimeoutMs?: number; - abort: (errorType: RequestInnerErrorType) => void; + abort: (errorType: RequestError) => void; } ): RequestControls { if (this._status === "succeed") { @@ -157,7 +168,7 @@ export class Request { ); } - this._status = "loading"; + this.setStatus("loading"); this.currentAttempt = { ...requestData }; this.progress = { startFromByte: this._loadedBytes, @@ -190,15 +201,22 @@ export class Request { }; } - private resolveEngineCallbacksSuccessfully(data: ArrayBuffer) { + resolveEngineCallbacksSuccessfully() { + if (!this.finalData) return; this._engineCallbacks?.onSuccess({ - data, + data: this.finalData, bandwidth: this.bandwidthApproximator.getBandwidth(), }); this._engineCallbacks = undefined; } + resolveEngineCallbacksWithError() { + this._engineCallbacks?.onError(new CoreRequestError("failed")); + this._engineCallbacks = undefined; + } + abortFromEngine() { + if (this._status !== "loading") return; this._engineCallbacks?.onError(new CoreRequestError("aborted")); this._engineCallbacks = undefined; this.requestProcessQueueCallback(); @@ -206,8 +224,8 @@ export class Request { abortFromProcessQueue() { this.throwErrorIfNotLoadingStatus(); - this._status = "aborted"; - this._abortRequestCallback?.("abort"); + this.setStatus("aborted"); + this._abortRequestCallback?.(new RequestError("abort")); this._abortRequestCallback = undefined; this.currentAttempt = undefined; this.notReceivingBytesTimeout.clear(); @@ -217,12 +235,12 @@ export class Request { this.throwErrorIfNotLoadingStatus(); if (!this.currentAttempt) return; - this._status = "failed"; + this.setStatus("failed"); const error = new RequestError("bytes-receiving-timeout"); - this._abortRequestCallback?.(error.type); + this._abortRequestCallback?.(error); this.currentAttempt.error = error; - this._failedAttempts.push(this.currentAttempt); + this._failedAttempts.add(this.currentAttempt); this.notReceivingBytesTimeout.clear(); this.requestProcessQueueCallback(); }; @@ -231,9 +249,9 @@ export class Request { this.throwErrorIfNotLoadingStatus(); if (!this.currentAttempt) return; - this._status = "failed"; + this.setStatus("failed"); this.currentAttempt.error = error; - this._failedAttempts.push(this.currentAttempt); + this._failedAttempts.add(this.currentAttempt); this.notReceivingBytesTimeout.clear(); this.requestProcessQueueCallback(); }; @@ -244,10 +262,10 @@ export class Request { this.notReceivingBytesTimeout.clear(); this.finalData = Utils.joinChunks(this.bytes); - this._status = "succeed"; + this.setStatus("succeed"); this._totalBytes = this._loadedBytes; - this.resolveEngineCallbacksSuccessfully(this.finalData); + this.resolveEngineCallbacksSuccessfully(); this.logger( `${this.currentAttempt.type} ${this.segment.externalId} succeed` ); @@ -287,6 +305,32 @@ export class Request { } } +class FailedRequestAttempts { + private readonly attempts: RequestAttempt[] = []; + private _httpAttemptsCount = 0; + private _p2pAttemptsCount = 0; + + add(attempt: RequestAttempt) { + this.attempts.push(attempt); + if (attempt.type === "http") this._httpAttemptsCount++; + else this._p2pAttemptsCount++; + } + + get httpAttemptsCount() { + return this._httpAttemptsCount; + } + + get p2pAttemptsCount() { + return this._p2pAttemptsCount; + } + + *p2pAttempts(): Generator, void> { + for (const attempt of this.attempts) { + if (attempt.type === "p2p") yield attempt as Required; + } + } +} + const requestInnerErrorTypes = ["abort", "bytes-receiving-timeout"] as const; const httpRequestErrorTypes = ["fetch-error"] as const; @@ -312,13 +356,6 @@ export class RequestError< constructor(readonly type: T, message?: string) { super(message); } - - static isRequestInnerErrorType( - error: RequestError - ): error is RequestError { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - return requestInnerErrorTypes.includes(error.type as any); - } } export class CoreRequestError extends Error { diff --git a/packages/p2p-media-loader-core/src/types.d.ts b/packages/p2p-media-loader-core/src/types.d.ts index e03ab496..bc8c46f3 100644 --- a/packages/p2p-media-loader-core/src/types.d.ts +++ b/packages/p2p-media-loader-core/src/types.d.ts @@ -59,6 +59,8 @@ export type Settings = { p2pNotReceivingBytesTimeoutMs: number; p2pLoaderDestroyTimeoutMs: number; httpNotReceivingBytesTimeoutMs: number; + maxHttpFailedDownloadAttempts: number; + maxPeerNotReceivingBytesTimeoutErrors: number; }; export type CoreEventHandlers = { From c3ea983108793cc4243e6a3dd908c7018368e32f Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Thu, 28 Dec 2023 13:27:09 +0200 Subject: [PATCH 05/11] Rename file. --- .../src/p2p/{peer-interface.ts => peer-protocol.ts} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename packages/p2p-media-loader-core/src/p2p/{peer-interface.ts => peer-protocol.ts} (99%) diff --git a/packages/p2p-media-loader-core/src/p2p/peer-interface.ts b/packages/p2p-media-loader-core/src/p2p/peer-protocol.ts similarity index 99% rename from packages/p2p-media-loader-core/src/p2p/peer-interface.ts rename to packages/p2p-media-loader-core/src/p2p/peer-protocol.ts index 12a136b6..f97d6d37 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer-interface.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer-protocol.ts @@ -10,7 +10,7 @@ export type PeerSettings = Pick< | "maxPeerNotReceivingBytesTimeoutErrors" >; -export class PeerInterface { +export class PeerProtocol { private commandChunks?: Command.BinaryCommandChunksJoiner; private uploadingContext?: { stopUploading: () => void }; From e65b6e57a2f76b069e1cb4b4ba5413692877d328 Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Fri, 29 Dec 2023 12:18:41 +0200 Subject: [PATCH 06/11] Refactor http loader. Throw error and clear already loaded data on bytes mismatch. --- packages/p2p-media-loader-core/src/core.ts | 4 +- .../p2p-media-loader-core/src/http-loader.ts | 211 ++++++++++-------- .../src/hybrid-loader.ts | 21 +- .../src/p2p/peer-protocol.ts | 4 +- .../p2p-media-loader-core/src/p2p/peer.ts | 36 ++- packages/p2p-media-loader-core/src/request.ts | 35 +-- packages/p2p-media-loader-core/src/types.d.ts | 4 +- 7 files changed, 165 insertions(+), 150 deletions(-) diff --git a/packages/p2p-media-loader-core/src/core.ts b/packages/p2p-media-loader-core/src/core.ts index d03b9700..39daa6cc 100644 --- a/packages/p2p-media-loader-core/src/core.ts +++ b/packages/p2p-media-loader-core/src/core.ts @@ -28,8 +28,8 @@ export class Core { p2pNotReceivingBytesTimeoutMs: 1000, p2pLoaderDestroyTimeoutMs: 30 * 1000, httpNotReceivingBytesTimeoutMs: 1000, - maxHttpFailedDownloadAttempts: 3, - maxPeerNotReceivingBytesTimeoutErrors: 3, + httpErrorRetries: 3, + p2pErrorRetries: 3, }; private readonly bandwidthApproximator = new BandwidthApproximator(); private segmentStorage?: SegmentsMemoryStorage; diff --git a/packages/p2p-media-loader-core/src/http-loader.ts b/packages/p2p-media-loader-core/src/http-loader.ts index 722b83ae..e0995a06 100644 --- a/packages/p2p-media-loader-core/src/http-loader.ts +++ b/packages/p2p-media-loader-core/src/http-loader.ts @@ -1,70 +1,139 @@ import { Settings } from "./types"; -import { Request, RequestError, HttpRequestErrorType } from "./request"; - -export async function fulfillHttpSegmentRequest( - request: Request, - settings: Pick -) { - const requestHeaders = new Headers(); - const { segment, loadedBytes: alreadyLoadedBytes } = request; - const { url, byteRange } = segment; - - let byteFrom = byteRange?.start; - const byteTo = byteRange?.end; - if (alreadyLoadedBytes !== 0) byteFrom = (byteFrom ?? 0) + alreadyLoadedBytes; - - if (byteFrom !== undefined) { - const byteRangeString = `bytes=${byteFrom}-${byteTo ?? ""}`; - requestHeaders.set("Range", byteRangeString); - } +import { + Request, + RequestError, + HttpRequestErrorType, + RequestControls, +} from "./request"; + +type HttpSettings = Pick; - const abortController = new AbortController(); - const requestControls = request.start( - { type: "http" }, - { - abort: () => abortController.abort("abort"), - notReceivingBytesTimeoutMs: settings.httpNotReceivingBytesTimeoutMs, +export class HttpRequestExecutor { + private readonly requestControls: RequestControls; + private readonly requestHeaders = new Headers(); + private readonly abortController = new AbortController(); + private readonly expectedBytesLength?: number; + private readonly byteRange?: { start: number; end?: number }; + + constructor( + private readonly request: Request, + private readonly settings: HttpSettings + ) { + const { byteRange } = this.request.segment; + if (byteRange) this.byteRange = { ...byteRange }; + + if (request.loadedBytes !== 0) { + this.byteRange = this.byteRange ?? { start: 0 }; + this.byteRange.start = this.byteRange.start + request.loadedBytes; } - ); - try { - const fetchResponse = await window.fetch(url, { - headers: requestHeaders, - signal: abortController.signal, - }); - if (!fetchResponse.ok) { - throw new RequestError("fetch-error", fetchResponse.statusText); + if (this.request.totalBytes) { + this.expectedBytesLength = + this.request.totalBytes - this.request.loadedBytes; } - if (!fetchResponse.body) return; - requestControls.firstBytesReceived(); - - if ( - byteFrom !== undefined && - (fetchResponse.status !== 206 || - !isResponseWithRequestedContentRange(fetchResponse, byteFrom, byteTo)) - ) { - request.clearLoadedBytes(); + + if (this.byteRange) { + const { start, end } = this.byteRange; + this.requestHeaders.set("Range", `bytes=${start}-${end ?? ""}`); } - if (request.totalBytes === undefined) { - const totalBytesString = fetchResponse.headers.get("Content-Length"); - if (totalBytesString) request.setTotalBytes(+totalBytesString); + const { httpNotReceivingBytesTimeoutMs } = this.settings; + this.requestControls = this.request.start( + { type: "http" }, + { + abort: () => this.abortController.abort("abort"), + notReceivingBytesTimeoutMs: httpNotReceivingBytesTimeoutMs, + } + ); + void this.fetch(); + } + + private async fetch() { + const { segment } = this.request; + try { + const response = await window.fetch(segment.url, { + headers: this.requestHeaders, + signal: this.abortController.signal, + }); + this.handleResponseHeaders(response); + + if (!response.body) return; + const { requestControls } = this; + requestControls.firstBytesReceived(); + + const reader = response.body.getReader(); + for await (const chunk of readStream(reader)) { + this.requestControls.addLoadedChunk(chunk); + } + requestControls.completeOnSuccess(); + } catch (error) { + this.handleError(error); } + } - const reader = fetchResponse.body.getReader(); - for await (const chunk of readStream(reader)) { - requestControls.addLoadedChunk(chunk); + private handleResponseHeaders(response: Response) { + if (!response.ok) { + if (response.status === 406) { + this.request.clearLoadedBytes(); + throw new RequestError("http-bytes-mismatch", response.statusText); + } else { + throw new RequestError("http-error", response.statusText); + } } - requestControls.completeOnSuccess(); - } catch (error) { + + const { byteRange } = this; + if (byteRange) { + if (response.status === 200) { + this.request.clearLoadedBytes(); + } else { + if (response.status !== 206) { + this.request.clearLoadedBytes(); + throw new RequestError("http-bytes-mismatch", response.statusText); + } + const contentLengthHeader = response.headers.get("Content-Length"); + if ( + contentLengthHeader && + this.expectedBytesLength !== +contentLengthHeader + ) { + this.request.clearLoadedBytes(); + throw new RequestError("http-bytes-mismatch", response.statusText); + } + + const contentRangeHeader = response.headers.get("Content-Range"); + const contentRange = contentRangeHeader + ? parseContentRangeHeader(contentRangeHeader) + : undefined; + if (contentRange) { + const { from, to, total } = contentRange; + if ( + (total !== undefined && this.request.totalBytes !== total) || + (from !== undefined && byteRange.start !== from) || + (to !== undefined && + byteRange.end !== undefined && + byteRange.end !== to) + ) { + this.request.clearLoadedBytes(); + throw new RequestError("http-bytes-mismatch", response.statusText); + } + } + } + } + + if (response.status === 200 && this.request.totalBytes === undefined) { + const contentLengthHeader = response.headers.get("Content-Length"); + if (contentLengthHeader) this.request.setTotalBytes(+contentLengthHeader); + } + } + + private handleError(error: unknown) { if (error instanceof Error) { if (error.name !== "abort") return; const httpLoaderError: RequestError = !( error instanceof RequestError ) - ? new RequestError("fetch-error", error.message) + ? new RequestError("http-error", error.message) : error; - requestControls.abortOnError(httpLoaderError); + this.requestControls.abortOnError(httpLoaderError); } } } @@ -79,7 +148,7 @@ async function* readStream( } } -function getValueFromContentRangeHeader(headerValue: string) { +function parseContentRangeHeader(headerValue: string) { const match = headerValue .trim() .match(/^bytes (?:(?:(\d+)|)-(?:(\d+)|)|\*)\/(?:(\d+)|\*)$/); @@ -92,41 +161,3 @@ function getValueFromContentRangeHeader(headerValue: string) { total: total ? parseInt(total) : undefined, }; } - -function isResponseWithRequestedContentRange( - response: Response, - requestedFromByte: number, - requestedToByte?: number -): boolean { - const requestedBytesAmount = - requestedToByte !== undefined - ? requestedToByte - requestedFromByte + 1 - : undefined; - - const { headers } = response; - const contentLengthHeader = headers.get("Content-Length"); - const contentLength = contentLengthHeader && parseInt(contentLengthHeader); - - if ( - contentLength && - requestedBytesAmount !== undefined && - requestedBytesAmount !== contentLength - ) { - return false; - } - - const contentRangeHeader = headers.get("Content-Range"); - const contentRange = - contentRangeHeader && getValueFromContentRangeHeader(contentRangeHeader); - if (!contentRange) return true; - const { from, to } = contentRange; - if (from !== requestedFromByte) return false; - if ( - to !== undefined && - requestedToByte !== undefined && - to !== requestedToByte - ) { - return false; - } - return true; -} diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index 6153e6f9..dfe688a4 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -1,5 +1,5 @@ import { Segment, StreamWithSegments } from "./index"; -import { fulfillHttpSegmentRequest } from "./http-loader"; +import { HttpRequestExecutor } from "./http-loader"; import { SegmentsMemoryStorage } from "./segments-storage"; import { Settings, CoreEventHandlers, Playback } from "./types"; import { BandwidthApproximator } from "./bandwidth-approximator"; @@ -126,13 +126,13 @@ export class HybridLoader { private processRequests(queueSegmentIds: Set) { const { stream } = this.lastRequestedSegment; - const { maxHttpFailedDownloadAttempts } = this.settings; + const { httpErrorRetries } = this.settings; for (const request of this.requests.items()) { const { type, status, segment, - isCheckedByProcessQueue, + isHandledByProcessQueue, isSegmentRequestedByEngine, } = request; @@ -161,7 +161,7 @@ export class HybridLoader { break; case "failed": - if (type === "http" && !isCheckedByProcessQueue) { + if (type === "http" && !isHandledByProcessQueue) { this.p2pLoaders.currentLoader.broadcastAnnouncement(); } if ( @@ -171,8 +171,7 @@ export class HybridLoader { this.requests.remove(request); } if ( - request.failedAttempts.httpAttemptsCount >= - maxHttpFailedDownloadAttempts && + request.failedAttempts.httpAttemptsCount >= httpErrorRetries && isSegmentRequestedByEngine ) { request.resolveEngineCallbacksWithError(); @@ -187,11 +186,12 @@ export class HybridLoader { this.requests.remove(request); break; } - request.markCheckedByProcessQueue(); + request.markHandledByProcessQueue(); } } private processQueue() { + console.log("process queue"); const { queue, queueSegmentIds } = QueueUtils.generateQueue({ lastRequestedSegment: this.lastRequestedSegment, playback: this.playback, @@ -208,7 +208,7 @@ export class HybridLoader { const { simultaneousHttpDownloads, simultaneousP2PDownloads, - maxHttpFailedDownloadAttempts, + httpErrorRetries, } = this.settings; for (const item of queue) { @@ -220,8 +220,7 @@ export class HybridLoader { if ( request?.type === "http" && request.status === "failed" && - request.failedAttempts.httpAttemptsCount >= - maxHttpFailedDownloadAttempts + request.failedAttempts.httpAttemptsCount >= httpErrorRetries ) { break; } @@ -287,7 +286,7 @@ export class HybridLoader { private async loadThroughHttp(segment: Segment) { const request = this.requests.getOrCreateRequest(segment); - void fulfillHttpSegmentRequest(request, this.settings); + new HttpRequestExecutor(request, this.settings); this.p2pLoaders.currentLoader.broadcastAnnouncement(); } diff --git a/packages/p2p-media-loader-core/src/p2p/peer-protocol.ts b/packages/p2p-media-loader-core/src/p2p/peer-protocol.ts index f97d6d37..eda3b608 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer-protocol.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer-protocol.ts @@ -5,9 +5,7 @@ import { Settings } from "../types"; export type PeerSettings = Pick< Settings, - | "p2pNotReceivingBytesTimeoutMs" - | "webRtcMaxMessageSize" - | "maxPeerNotReceivingBytesTimeoutErrors" + "p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize" | "p2pErrorRetries" >; export class PeerProtocol { diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index cb9448bf..7d8e3c0c 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -1,5 +1,5 @@ import { PeerConnection } from "bittorrent-tracker"; -import { PeerInterface, PeerSettings } from "./peer-interface"; +import { PeerProtocol, PeerSettings } from "./peer-protocol"; import { Request, RequestControls, @@ -24,7 +24,7 @@ type PeerEventHandlers = { export class Peer { readonly id: string; - private readonly peerInterface; + private readonly peerProtocol; private downloadingContext?: { request: Request; controls: RequestControls; @@ -43,7 +43,7 @@ export class Peer { private readonly settings: PeerSettings ) { this.id = Peer.getPeerIdFromConnection(connection); - this.peerInterface = new PeerInterface(connection, settings, { + this.peerProtocol = new PeerProtocol(connection, settings, { onSegmentChunkReceived: this.onSegmentChunkReceived, onCommandReceived: this.onCommandReceived, onDestroy: this.destroy, @@ -68,20 +68,18 @@ export class Peer { break; case PeerCommandType.SegmentRequest: - this.peerInterface.stopUploadingSegmentData(); + this.peerProtocol.stopUploadingSegmentData(); this.eventHandlers.onSegmentRequested(this, command.i, command.b); break; case PeerCommandType.SegmentData: { if (!this.downloadingContext) break; - this.downloadingContext.isSegmentDataCommandReceived = true; const { request, controls } = this.downloadingContext; + if (request.segment.externalId !== command.i) break; + this.downloadingContext.isSegmentDataCommandReceived = true; controls.firstBytesReceived(); - if ( - request.segment.externalId === command.i && - request.totalBytes === undefined - ) { + if (request.totalBytes === undefined) { request.setTotalBytes(command.s); } } @@ -95,7 +93,7 @@ export class Peer { break; case PeerCommandType.CancelSegmentRequest: - this.peerInterface.stopUploadingSegmentData(); + this.peerProtocol.stopUploadingSegmentData(); break; } }; @@ -137,9 +135,9 @@ export class Peer { const timeoutErrors = this.downloadingErrors.filter( (error) => error.type === "bytes-receiving-timeout" ); - const { maxPeerNotReceivingBytesTimeoutErrors } = this.settings; - if (timeoutErrors.length >= maxPeerNotReceivingBytesTimeoutErrors) { - this.peerInterface.destroy(); + const { p2pErrorRetries } = this.settings; + if (timeoutErrors.length >= p2pErrorRetries) { + this.peerProtocol.destroy(); } }, } @@ -150,7 +148,7 @@ export class Peer { i: segmentRequest.segment.externalId, }; if (segmentRequest.loadedBytes) command.b = segmentRequest.loadedBytes; - this.peerInterface.sendCommand(command); + this.peerProtocol.sendCommand(command); } async uploadSegmentData(segmentExternalId: number, data: ArrayBuffer) { @@ -160,9 +158,9 @@ export class Peer { i: segmentExternalId, s: data.byteLength, }; - this.peerInterface.sendCommand(command); + this.peerProtocol.sendCommand(command); try { - await this.peerInterface.splitSegmentDataToChunksAndUploadAsync( + await this.peerProtocol.splitSegmentDataToChunksAndUploadAsync( data as Uint8Array ); this.logger(`segment ${segmentExternalId} has been sent to ${this.id}`); @@ -194,18 +192,18 @@ export class Peer { p: httpLoadingSegmentsIds, l: loadedSegmentsIds, }; - this.peerInterface.sendCommand(command); + this.peerProtocol.sendCommand(command); } sendSegmentAbsentCommand(segmentExternalId: number) { - this.peerInterface.sendCommand({ + this.peerProtocol.sendCommand({ c: PeerCommandType.SegmentAbsent, i: segmentExternalId, }); } private sendCancelSegmentRequestCommand(segment: Segment) { - this.peerInterface.sendCommand({ + this.peerProtocol.sendCommand({ c: PeerCommandType.CancelSegmentRequest, i: segment.externalId, }); diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts index 93d48d42..2657154c 100644 --- a/packages/p2p-media-loader-core/src/request.ts +++ b/packages/p2p-media-loader-core/src/request.ts @@ -65,7 +65,7 @@ export class Request { error: RequestError ) => void; private readonly _logger: debug.Debugger; - private _isCheckedByProcessQueue = false; + private _isHandledByProcessQueue = false; constructor( readonly segment: Segment, @@ -98,7 +98,7 @@ export class Request { private setStatus(status: RequestStatus) { this._status = status; - this._isCheckedByProcessQueue = false; + this._isHandledByProcessQueue = false; } get isSegmentRequestedByEngine(): boolean { @@ -127,19 +127,19 @@ export class Request { return this._failedAttempts; } - get isCheckedByProcessQueue() { - return this._isCheckedByProcessQueue; + get isHandledByProcessQueue() { + return this._isHandledByProcessQueue; } - markCheckedByProcessQueue() { - this._isCheckedByProcessQueue = true; + markHandledByProcessQueue() { + this._isHandledByProcessQueue = true; } setEngineCallbacks(callbacks: EngineCallbacks) { if (this._engineCallbacks) { throw new Error("Segment is already requested by engine"); } - this._isCheckedByProcessQueue = false; + this._isHandledByProcessQueue = false; this._engineCallbacks = callbacks; } @@ -312,33 +312,22 @@ export class Request { class FailedRequestAttempts { private readonly attempts: RequestAttempt[] = []; - private _httpAttemptsCount = 0; - private _p2pAttemptsCount = 0; add(attempt: RequestAttempt) { this.attempts.push(attempt); - if (attempt.type === "http") this._httpAttemptsCount++; - else this._p2pAttemptsCount++; } get httpAttemptsCount() { - return this._httpAttemptsCount; - } - - get p2pAttemptsCount() { - return this._p2pAttemptsCount; - } - - *p2pAttempts(): Generator, void> { - for (const attempt of this.attempts) { - if (attempt.type === "p2p") yield attempt as Required; - } + return this.attempts.reduce( + (sum, attempt) => (attempt.type === "http" ? sum + 1 : sum), + 0 + ); } } const requestInnerErrorTypes = ["abort", "bytes-receiving-timeout"] as const; -const httpRequestErrorTypes = ["fetch-error"] as const; +const httpRequestErrorTypes = ["http-error", "http-bytes-mismatch"] as const; const peerRequestErrorTypes = [ "peer-response-bytes-mismatch", diff --git a/packages/p2p-media-loader-core/src/types.d.ts b/packages/p2p-media-loader-core/src/types.d.ts index d4ad0669..78d36e21 100644 --- a/packages/p2p-media-loader-core/src/types.d.ts +++ b/packages/p2p-media-loader-core/src/types.d.ts @@ -59,8 +59,8 @@ export type Settings = { p2pNotReceivingBytesTimeoutMs: number; p2pLoaderDestroyTimeoutMs: number; httpNotReceivingBytesTimeoutMs: number; - maxHttpFailedDownloadAttempts: number; - maxPeerNotReceivingBytesTimeoutErrors: number; + httpErrorRetries: number; + p2pErrorRetries: number; }; export type CoreEventHandlers = { From f26233c1c02d9af0f980fad4f7483274f50d6f66 Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Fri, 29 Dec 2023 13:43:48 +0200 Subject: [PATCH 07/11] Add SegmentDataSendingCompleted command. --- .../src/hybrid-loader.ts | 1 - .../src/p2p/commands/commands.ts | 1 + .../src/p2p/commands/types.ts | 5 ++- .../p2p-media-loader-core/src/p2p/loader.ts | 2 +- .../p2p-media-loader-core/src/p2p/peer.ts | 35 ++++++++++++++++--- 5 files changed, 36 insertions(+), 8 deletions(-) diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index dfe688a4..9b9841a6 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -191,7 +191,6 @@ export class HybridLoader { } private processQueue() { - console.log("process queue"); const { queue, queueSegmentIds } = QueueUtils.generateQueue({ lastRequestedSegment: this.lastRequestedSegment, playback: this.playback, diff --git a/packages/p2p-media-loader-core/src/p2p/commands/commands.ts b/packages/p2p-media-loader-core/src/p2p/commands/commands.ts index 30ddeca6..d5cdd6f9 100644 --- a/packages/p2p-media-loader-core/src/p2p/commands/commands.ts +++ b/packages/p2p-media-loader-core/src/p2p/commands/commands.ts @@ -61,6 +61,7 @@ export function serializePeerCommand( switch (command.c) { case PeerCommandType.CancelSegmentRequest: case PeerCommandType.SegmentAbsent: + case PeerCommandType.SegmentDataSendingCompleted: return serializePeerSegmentCommand(command, maxChunkSize); case PeerCommandType.SegmentRequest: return serializePeerSegmentRequestCommand(command, maxChunkSize); diff --git a/packages/p2p-media-loader-core/src/p2p/commands/types.ts b/packages/p2p-media-loader-core/src/p2p/commands/types.ts index 27e9b144..cbe2aba3 100644 --- a/packages/p2p-media-loader-core/src/p2p/commands/types.ts +++ b/packages/p2p-media-loader-core/src/p2p/commands/types.ts @@ -6,12 +6,15 @@ export enum PeerCommandType { SegmentsAnnouncement, SegmentRequest, SegmentData, + SegmentDataSendingCompleted, SegmentAbsent, CancelSegmentRequest, } export type PeerSegmentCommand = BasePeerCommand< - PeerCommandType.SegmentAbsent | PeerCommandType.CancelSegmentRequest + | PeerCommandType.SegmentAbsent + | PeerCommandType.CancelSegmentRequest + | PeerCommandType.SegmentDataSendingCompleted > & { i: number; // segment id }; diff --git a/packages/p2p-media-loader-core/src/p2p/loader.ts b/packages/p2p-media-loader-core/src/p2p/loader.ts index cfda5d49..c276ba8b 100644 --- a/packages/p2p-media-loader-core/src/p2p/loader.ts +++ b/packages/p2p-media-loader-core/src/p2p/loader.ts @@ -120,7 +120,7 @@ export class P2PLoader { return; } void peer.uploadSegmentData( - segmentExternalId, + segment, byteFrom !== undefined ? segmentData.slice(byteFrom) : segmentData ); }; diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index 7d8e3c0c..ab98e4c9 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -81,6 +81,22 @@ export class Peer { controls.firstBytesReceived(); if (request.totalBytes === undefined) { request.setTotalBytes(command.s); + } else if (request.totalBytes - request.loadedBytes !== command.s) { + this.cancelSegmentDownloading("peer-response-bytes-mismatch"); + request.clearLoadedBytes(); + } + } + break; + + case PeerCommandType.SegmentDataSendingCompleted: + if (this.downloadingContext?.request.segment.externalId === command.i) { + const { request, controls } = this.downloadingContext; + if (request.loadedBytes !== request.totalBytes) { + request.clearLoadedBytes(); + this.cancelSegmentDownloading("peer-response-bytes-mismatch"); + } else { + controls.completeOnSuccess(); + this.downloadingContext = undefined; } } break; @@ -151,11 +167,12 @@ export class Peer { this.peerProtocol.sendCommand(command); } - async uploadSegmentData(segmentExternalId: number, data: ArrayBuffer) { - this.logger(`send segment ${segmentExternalId} to ${this.id}`); + async uploadSegmentData(segment: Segment, data: ArrayBuffer) { + const { externalId } = segment; + this.logger(`send segment ${segment.externalId} to ${this.id}`); const command: Command.PeerSendSegmentCommand = { c: PeerCommandType.SegmentData, - i: segmentExternalId, + i: externalId, s: data.byteLength, }; this.peerProtocol.sendCommand(command); @@ -163,9 +180,10 @@ export class Peer { await this.peerProtocol.splitSegmentDataToChunksAndUploadAsync( data as Uint8Array ); - this.logger(`segment ${segmentExternalId} has been sent to ${this.id}`); + this.sendSegmentDataSendingCompletedCommand(segment); + this.logger(`segment ${externalId} has been sent to ${this.id}`); } catch (err) { - this.logger(`cancel segment uploading ${segmentExternalId}`); + this.logger(`cancel segment uploading ${externalId}`); } } @@ -209,6 +227,13 @@ export class Peer { }); } + private sendSegmentDataSendingCompletedCommand(segment: Segment) { + this.peerProtocol.sendCommand({ + c: PeerCommandType.SegmentDataSendingCompleted, + i: segment.externalId, + }); + } + destroy = () => { this.cancelSegmentDownloading("peer-closed"); this.eventHandlers.onPeerClosed(this); From b7ab09f9a17db0854c35fea3cb6638e090ce4c14 Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Fri, 29 Dec 2023 14:22:40 +0200 Subject: [PATCH 08/11] Failed attempts clear interval. --- .../src/hybrid-loader.ts | 27 ++++++++++++++----- .../src/request-container.ts | 4 --- packages/p2p-media-loader-core/src/request.ts | 13 ++++++++- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index 9b9841a6..a49ca462 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -12,6 +12,8 @@ import * as StreamUtils from "./utils/stream"; import * as Utils from "./utils/utils"; import debug from "debug"; +const FAILED_ATTEMPTS_CLEAR_INTERVAL = 60000; + export class HybridLoader { private readonly requests: RequestsContainer; private readonly p2pLoaders: P2PLoadersContainer; @@ -127,6 +129,7 @@ export class HybridLoader { private processRequests(queueSegmentIds: Set) { const { stream } = this.lastRequestedSegment; const { httpErrorRetries } = this.settings; + const now = performance.now(); for (const request of this.requests.items()) { const { type, @@ -187,6 +190,12 @@ export class HybridLoader { break; } request.markHandledByProcessQueue(); + if ( + now - request.failedAttempts.lastClearTimestamp > + FAILED_ATTEMPTS_CLEAR_INTERVAL + ) { + request.failedAttempts.clear(); + } } } @@ -294,7 +303,7 @@ export class HybridLoader { } private loadRandomThroughHttp() { - const { simultaneousHttpDownloads } = this.settings; + const { simultaneousHttpDownloads, httpErrorRetries } = this.settings; const p2pLoader = this.p2pLoaders.currentLoader; const connectedPeersAmount = p2pLoader.connectedPeersAmount; if ( @@ -307,11 +316,17 @@ export class HybridLoader { lastRequestedSegment: this.lastRequestedSegment, playback: this.playback, settings: this.settings, - skipSegment: (segment, statuses) => - !statuses.isHttpDownloadable || - this.segmentStorage.hasSegment(segment) || - this.requests.isHybridLoaderRequested(segment) || - p2pLoader.isLoadingOrLoadedBySomeone(segment), + skipSegment: (segment, statuses) => { + const request = this.requests.get(segment); + return ( + !statuses.isHttpDownloadable || + this.segmentStorage.hasSegment(segment) || + request?.type !== undefined || + (request?.failedAttempts.httpAttemptsCount ?? 0) >= + httpErrorRetries || + p2pLoader.isLoadingOrLoadedBySomeone(segment) + ); + }, }); if (!queue.length) return; const peersAmount = connectedPeersAmount + 1; diff --git a/packages/p2p-media-loader-core/src/request-container.ts b/packages/p2p-media-loader-core/src/request-container.ts index 14d332e0..46987820 100644 --- a/packages/p2p-media-loader-core/src/request-container.ts +++ b/packages/p2p-media-loader-core/src/request-container.ts @@ -73,10 +73,6 @@ export class RequestsContainer { } } - isHybridLoaderRequested(segment: Segment): boolean { - return !!this.requests.get(segment)?.type; - } - destroy() { for (const request of this.requests.values()) { request.abortFromProcessQueue(); diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts index 2657154c..a2f525eb 100644 --- a/packages/p2p-media-loader-core/src/request.ts +++ b/packages/p2p-media-loader-core/src/request.ts @@ -139,6 +139,7 @@ export class Request { if (this._engineCallbacks) { throw new Error("Segment is already requested by engine"); } + this.failedAttempts.clear(); this._isHandledByProcessQueue = false; this._engineCallbacks = callbacks; } @@ -311,7 +312,12 @@ export class Request { } class FailedRequestAttempts { - private readonly attempts: RequestAttempt[] = []; + private attempts: RequestAttempt[] = []; + private _lastClearTimestamp = performance.now(); + + get lastClearTimestamp() { + return this._lastClearTimestamp; + } add(attempt: RequestAttempt) { this.attempts.push(attempt); @@ -323,6 +329,11 @@ class FailedRequestAttempts { 0 ); } + + clear() { + this.attempts = []; + this._lastClearTimestamp = performance.now(); + } } const requestInnerErrorTypes = ["abort", "bytes-receiving-timeout"] as const; From 70a09b15918198daa9afcb6c9321fc8de4cbe19e Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Fri, 29 Dec 2023 17:16:24 +0200 Subject: [PATCH 09/11] Throw http unexpected status code error. --- packages/p2p-media-loader-core/src/http-loader.ts | 13 ++++++++++--- packages/p2p-media-loader-core/src/request.ts | 6 +++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/packages/p2p-media-loader-core/src/http-loader.ts b/packages/p2p-media-loader-core/src/http-loader.ts index e0995a06..7beac7a3 100644 --- a/packages/p2p-media-loader-core/src/http-loader.ts +++ b/packages/p2p-media-loader-core/src/http-loader.ts @@ -83,15 +83,22 @@ export class HttpRequestExecutor { const { byteRange } = this; if (byteRange) { if (response.status === 200) { - this.request.clearLoadedBytes(); + if (this.request.segment.byteRange) { + throw new RequestError("http-unexpected-status-code"); + } else { + this.request.clearLoadedBytes(); + } } else { if (response.status !== 206) { - this.request.clearLoadedBytes(); - throw new RequestError("http-bytes-mismatch", response.statusText); + throw new RequestError( + "http-unexpected-status-code", + response.statusText + ); } const contentLengthHeader = response.headers.get("Content-Length"); if ( contentLengthHeader && + this.expectedBytesLength !== undefined && this.expectedBytesLength !== +contentLengthHeader ) { this.request.clearLoadedBytes(); diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts index a2f525eb..72dabe26 100644 --- a/packages/p2p-media-loader-core/src/request.ts +++ b/packages/p2p-media-loader-core/src/request.ts @@ -338,7 +338,11 @@ class FailedRequestAttempts { const requestInnerErrorTypes = ["abort", "bytes-receiving-timeout"] as const; -const httpRequestErrorTypes = ["http-error", "http-bytes-mismatch"] as const; +const httpRequestErrorTypes = [ + "http-error", + "http-bytes-mismatch", + "http-unexpected-status-code", +] as const; const peerRequestErrorTypes = [ "peer-response-bytes-mismatch", From f28164952b5e9a15fc9bb8cc58d0ecccc0be0312 Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Fri, 29 Dec 2023 17:29:06 +0200 Subject: [PATCH 10/11] Clear failed attempts list after 60 seconds after last error. --- .../src/hybrid-loader.ts | 2 +- packages/p2p-media-loader-core/src/request.ts | 28 ++++++++++++++----- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index a49ca462..066f0914 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -191,7 +191,7 @@ export class HybridLoader { } request.markHandledByProcessQueue(); if ( - now - request.failedAttempts.lastClearTimestamp > + now - request.failedAttempts.lastAttempt.error.timestamp > FAILED_ATTEMPTS_CLEAR_INTERVAL ) { request.failedAttempts.clear(); diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts index 72dabe26..d4de63b2 100644 --- a/packages/p2p-media-loader-core/src/request.ts +++ b/packages/p2p-media-loader-core/src/request.ts @@ -37,7 +37,10 @@ export type RequestControls = Readonly<{ abortOnError: Request["abortOnError"]; }>; -type OmitEncapsulated = Omit; +type OmitEncapsulated = Omit< + T, + "error" | "errorTimestamp" +>; type StartRequestParameters = | OmitEncapsulated | OmitEncapsulated; @@ -244,8 +247,10 @@ export class Request { this._abortRequestCallback?.(error); this.logger(`${this.type} ${this.segment.externalId} failed ${error.type}`); - this.currentAttempt.error = error; - this._failedAttempts.add(this.currentAttempt); + this._failedAttempts.add({ + ...this.currentAttempt, + error, + }); this.notReceivingBytesTimeout.clear(); this.requestProcessQueueCallback(); }; @@ -256,8 +261,10 @@ export class Request { this.setStatus("failed"); this.logger(`${this.type} ${this.segment.externalId} failed ${error.type}`); - this.currentAttempt.error = error; - this._failedAttempts.add(this.currentAttempt); + this._failedAttempts.add({ + ...this.currentAttempt, + error, + }); this.notReceivingBytesTimeout.clear(); this.requestProcessQueueCallback(); }; @@ -312,14 +319,14 @@ export class Request { } class FailedRequestAttempts { - private attempts: RequestAttempt[] = []; + private attempts: Required[] = []; private _lastClearTimestamp = performance.now(); get lastClearTimestamp() { return this._lastClearTimestamp; } - add(attempt: RequestAttempt) { + add(attempt: Required) { this.attempts.push(attempt); } @@ -330,6 +337,10 @@ class FailedRequestAttempts { ); } + get lastAttempt(): Readonly> { + return this.attempts[this.attempts.length - 1]; + } + clear() { this.attempts = []; this._lastClearTimestamp = performance.now(); @@ -362,11 +373,14 @@ type RequestErrorType = export class RequestError< T extends RequestErrorType = RequestErrorType, > extends Error { + readonly timestamp: number; + constructor( readonly type: T, message?: string ) { super(message); + this.timestamp = performance.now(); } } From 04854d1a069c1a186fb928c562e0f8febc605d3b Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Fri, 29 Dec 2023 17:30:29 +0200 Subject: [PATCH 11/11] Fix type error. --- packages/p2p-media-loader-core/src/hybrid-loader.ts | 6 ++++-- packages/p2p-media-loader-core/src/request.ts | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index 066f0914..e008baf0 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -190,9 +190,11 @@ export class HybridLoader { break; } request.markHandledByProcessQueue(); + + const { lastAttempt } = request.failedAttempts; if ( - now - request.failedAttempts.lastAttempt.error.timestamp > - FAILED_ATTEMPTS_CLEAR_INTERVAL + lastAttempt && + now - lastAttempt.error.timestamp > FAILED_ATTEMPTS_CLEAR_INTERVAL ) { request.failedAttempts.clear(); } diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts index d4de63b2..2fa68fcc 100644 --- a/packages/p2p-media-loader-core/src/request.ts +++ b/packages/p2p-media-loader-core/src/request.ts @@ -337,7 +337,7 @@ class FailedRequestAttempts { ); } - get lastAttempt(): Readonly> { + get lastAttempt(): Readonly> | undefined { return this.attempts[this.attempts.length - 1]; }