diff --git a/p2p-media-loader-demo/index.html b/p2p-media-loader-demo/index.html index cc9c4afa..e82de8c5 100644 --- a/p2p-media-loader-demo/index.html +++ b/p2p-media-loader-demo/index.html @@ -9,7 +9,7 @@ + src="https://cdn.jsdelivr.net/npm/shaka-player@~4.4.0/dist/shaka-player.compiled.min.js"> diff --git a/packages/p2p-media-loader-core/src/core.ts b/packages/p2p-media-loader-core/src/core.ts index 20cbada8..0ad32550 100644 --- a/packages/p2p-media-loader-core/src/core.ts +++ b/packages/p2p-media-loader-core/src/core.ts @@ -10,7 +10,7 @@ import { import * as StreamUtils from "./utils/stream"; import { LinkedMap } from "./linked-map"; import { BandwidthCalculator } from "./bandwidth-calculator"; -import { EngineCallbacks } from "./request"; +import { EngineCallbacks } from "./requests/engine-request"; import { SegmentsMemoryStorage } from "./segments-storage"; export class Core { diff --git a/packages/p2p-media-loader-core/src/http-loader.ts b/packages/p2p-media-loader-core/src/http-loader.ts index 7beac7a3..59980b9a 100644 --- a/packages/p2p-media-loader-core/src/http-loader.ts +++ b/packages/p2p-media-loader-core/src/http-loader.ts @@ -4,7 +4,7 @@ import { RequestError, HttpRequestErrorType, RequestControls, -} from "./request"; +} from "./requests/request"; type HttpSettings = Pick; diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index 783e232b..22f6518b 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -4,8 +4,8 @@ import { SegmentsMemoryStorage } from "./segments-storage"; import { Settings, CoreEventHandlers, Playback } from "./types"; import { BandwidthCalculator } from "./bandwidth-calculator"; import { P2PLoadersContainer } from "./p2p/loaders-container"; -import { RequestsContainer } from "./request-container"; -import { EngineCallbacks } from "./request"; +import { RequestsContainer } from "./requests/request-container"; +import { EngineRequest, EngineCallbacks } from "./requests/engine-request"; import * as QueueUtils from "./utils/queue"; import * as LoggerUtils from "./utils/logger"; import * as StreamUtils from "./utils/stream"; @@ -25,6 +25,7 @@ export class HybridLoader { private randomHttpDownloadInterval!: number; private readonly logger: debug.Debugger; private isProcessQueueMicrotaskCreated = false; + private readonly engineRequests = new Map(); constructor( private streamManifestUrl: string, @@ -88,18 +89,18 @@ export class HybridLoader { } this.lastRequestedSegment = segment; + const engineRequest = new EngineRequest(segment, callbacks); if (this.segmentStorage.hasSegment(segment)) { // TODO: error handling const data = await this.segmentStorage.getSegmentData(segment); if (data) { - callbacks.onSuccess({ + engineRequest.resolve( data, - bandwidth: this.bandwidthCalculator.getBandwidthForLastNSeconds(3), - }); + this.bandwidthCalculator.getBandwidthForLastNSeconds(3) + ); } } else { - const request = this.requests.getOrCreateRequest(segment); - request.setEngineCallbacks(callbacks); + this.engineRequests.set(segment, engineRequest); } this.requestProcessQueueMicrotask(); } @@ -131,53 +132,45 @@ export class HybridLoader { const { httpErrorRetries } = this.settings; const now = performance.now(); for (const request of this.requests.items()) { - const { - type, - status, - segment, - isHandledByProcessQueue, - isSegmentRequestedByEngine, - } = request; - - if (!type) continue; + const { type, status, segment, isHandledByProcessQueue } = request; + const engineRequest = this.engineRequests.get(segment); switch (status) { case "loading": - if ( - !isSegmentRequestedByEngine && - !queueSegmentIds.has(segment.localId) - ) { + if (!queueSegmentIds.has(segment.localId) && !engineRequest) { request.abortFromProcessQueue(); this.requests.remove(request); } break; case "succeed": - if (!request.data) break; + if (!request.data || !type) break; if (type === "http") { this.p2pLoaders.currentLoader.broadcastAnnouncement(); } - request.resolveEngineCallbacksSuccessfully(); + engineRequest?.resolve( + request.data, + this.bandwidthCalculator.getBandwidthForLastNSeconds(3) + ); + this.engineRequests.delete(segment); + this.requests.remove(request); void this.segmentStorage.storeSegment(request.segment, request.data); this.eventHandlers?.onSegmentLoaded?.(request.data.byteLength, type); - this.requests.remove(request); break; case "failed": if (type === "http" && !isHandledByProcessQueue) { this.p2pLoaders.currentLoader.broadcastAnnouncement(); } - if ( - !isSegmentRequestedByEngine && - !stream.segments.has(request.segment.localId) - ) { + if (!engineRequest && !stream.segments.has(request.segment.localId)) { this.requests.remove(request); } if ( request.failedAttempts.httpAttemptsCount >= httpErrorRetries && - isSegmentRequestedByEngine + engineRequest ) { - request.resolveEngineCallbacksWithError(); + engineRequest.reject(); + this.engineRequests.delete(segment); } break; @@ -189,8 +182,8 @@ export class HybridLoader { this.requests.remove(request); break; } - request.markHandledByProcessQueue(); + request.markHandledByProcessQueue(); const { lastAttempt } = request.failedAttempts; if ( lastAttempt && @@ -221,6 +214,22 @@ export class HybridLoader { httpErrorRetries, } = this.settings; + for (const engineRequest of this.engineRequests.values()) { + if (this.requests.executingHttpCount >= simultaneousHttpDownloads) break; + const request = this.requests.get(engineRequest.segment); + if ( + !queueSegmentIds.has(engineRequest.segment.localId) && + engineRequest.status === "pending" && + (!request || + request.status === "not-started" || + (request.status === "failed" && + request.failedAttempts.httpAttemptsCount < + this.settings.httpErrorRetries)) + ) { + void this.loadThroughHttp(engineRequest.segment); + } + } + for (const item of queue) { const { statuses, segment } = item; const request = this.requests.get(segment); @@ -239,7 +248,7 @@ export class HybridLoader { request?.status === "loading" && request.type === "p2p"; if (this.requests.executingHttpCount < simultaneousHttpDownloads) { - if (isP2PLoadingRequest) request.abortFromEngine(); + if (isP2PLoadingRequest) request.abortFromProcessQueue(); void this.loadThroughHttp(segment); continue; } @@ -248,7 +257,7 @@ export class HybridLoader { this.abortLastHttpLoadingInQueueAfterItem(queue, segment) && this.requests.executingHttpCount < simultaneousHttpDownloads ) { - if (isP2PLoadingRequest) request.abortFromEngine(); + if (isP2PLoadingRequest) request.abortFromProcessQueue(); void this.loadThroughHttp(segment); continue; } @@ -288,10 +297,17 @@ export class HybridLoader { // api method for engines abortSegmentRequest(segmentLocalId: string) { - const request = this.requests.getBySegmentLocalId(segmentLocalId); - if (!request) return; - request.abortFromEngine(); - this.logger("abort: ", LoggerUtils.getSegmentString(request.segment)); + for (const engineRequest of this.engineRequests.values()) { + if (segmentLocalId === engineRequest.segment.localId) { + engineRequest.abort(); + this.engineRequests.delete(engineRequest.segment); + this.logger( + "abort: ", + LoggerUtils.getSegmentString(engineRequest.segment) + ); + break; + } + } } private async loadThroughHttp(segment: Segment) { diff --git a/packages/p2p-media-loader-core/src/index.ts b/packages/p2p-media-loader-core/src/index.ts index 08df54ac..4bf4dba2 100644 --- a/packages/p2p-media-loader-core/src/index.ts +++ b/packages/p2p-media-loader-core/src/index.ts @@ -2,5 +2,5 @@ export { Core } from "./core"; export type * from "./types"; -export { CoreRequestError } from "./request"; -export type { EngineCallbacks } from "./request"; +export { CoreRequestError } from "./requests/engine-request"; +export type { EngineCallbacks } from "./requests/engine-request"; diff --git a/packages/p2p-media-loader-core/src/p2p/loader.ts b/packages/p2p-media-loader-core/src/p2p/loader.ts index c276ba8b..f08b01cf 100644 --- a/packages/p2p-media-loader-core/src/p2p/loader.ts +++ b/packages/p2p-media-loader-core/src/p2p/loader.ts @@ -1,8 +1,8 @@ import { Peer } from "./peer"; import { Segment, Settings, StreamWithSegments } from "../types"; import { SegmentsMemoryStorage } from "../segments-storage"; -import { RequestsContainer } from "../request-container"; -import { Request } from "../request"; +import { RequestsContainer } from "../requests/request-container"; +import { Request } from "../requests/request"; import { P2PTrackerClient } from "./tracker-client"; import * as StreamUtils from "../utils/stream"; import * as Utils from "../utils/utils"; diff --git a/packages/p2p-media-loader-core/src/p2p/loaders-container.ts b/packages/p2p-media-loader-core/src/p2p/loaders-container.ts index f36a6263..34b02415 100644 --- a/packages/p2p-media-loader-core/src/p2p/loaders-container.ts +++ b/packages/p2p-media-loader-core/src/p2p/loaders-container.ts @@ -1,7 +1,7 @@ import { P2PLoader } from "./loader"; import debug from "debug"; import { Settings, Stream, StreamWithSegments } from "../index"; -import { RequestsContainer } from "../request-container"; +import { RequestsContainer } from "../requests/request-container"; import { SegmentsMemoryStorage } from "../segments-storage"; import * as LoggerUtils from "../utils/logger"; diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index ab98e4c9..fac4c767 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -6,7 +6,7 @@ import { RequestError, PeerRequestErrorType, RequestInnerErrorType, -} from "../request"; +} from "../requests/request"; import * as Command from "./commands"; import { Segment } from "../types"; import * as Utils from "../utils/utils"; diff --git a/packages/p2p-media-loader-core/src/requests/engine-request.ts b/packages/p2p-media-loader-core/src/requests/engine-request.ts new file mode 100644 index 00000000..57b276fb --- /dev/null +++ b/packages/p2p-media-loader-core/src/requests/engine-request.ts @@ -0,0 +1,49 @@ +import { Segment, SegmentResponse } from "../types"; + +export type EngineCallbacks = { + onSuccess: (response: SegmentResponse) => void; + onError: (reason: CoreRequestError) => void; +}; + +export class EngineRequest { + private _status: "pending" | "succeed" | "failed" | "aborted" = "pending"; + + constructor( + readonly segment: Segment, + readonly engineCallbacks: EngineCallbacks + ) {} + + get status() { + return this._status; + } + + resolve(data: ArrayBuffer, bandwidth: number) { + this.throwErrorIfNotPending(); + this._status = "succeed"; + this.engineCallbacks.onSuccess({ data, bandwidth }); + } + + reject() { + this.throwErrorIfNotPending(); + this._status = "failed"; + this.engineCallbacks.onError(new CoreRequestError("failed")); + } + + abort() { + this.throwErrorIfNotPending(); + this._status = "aborted"; + this.engineCallbacks.onError(new CoreRequestError("aborted")); + } + + private throwErrorIfNotPending() { + if (this._status !== "pending") { + throw new Error("Engine request has been already settled."); + } + } +} + +export class CoreRequestError extends Error { + constructor(readonly type: "failed" | "aborted") { + super(); + } +} diff --git a/packages/p2p-media-loader-core/src/request-container.ts b/packages/p2p-media-loader-core/src/requests/request-container.ts similarity index 84% rename from packages/p2p-media-loader-core/src/request-container.ts rename to packages/p2p-media-loader-core/src/requests/request-container.ts index b0bf517b..bc0edca2 100644 --- a/packages/p2p-media-loader-core/src/request-container.ts +++ b/packages/p2p-media-loader-core/src/requests/request-container.ts @@ -1,5 +1,5 @@ -import { Segment, Settings, Playback } from "./types"; -import { BandwidthCalculator } from "./bandwidth-calculator"; +import { Segment, Settings, Playback } from "../types"; +import { BandwidthCalculator } from "../bandwidth-calculator"; import { Request } from "./request"; export class RequestsContainer { @@ -32,12 +32,6 @@ export class RequestsContainer { return this.requests.get(segment); } - getBySegmentLocalId(id: string) { - for (const request of this.requests.values()) { - if (request.segment.localId === id) return request; - } - } - getOrCreateRequest(segment: Segment) { let request = this.requests.get(segment); if (!request) { @@ -76,7 +70,6 @@ export class RequestsContainer { destroy() { for (const request of this.requests.values()) { request.abortFromProcessQueue(); - request.abortFromEngine(); } this.requests.clear(); } diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/requests/request.ts similarity index 86% rename from packages/p2p-media-loader-core/src/request.ts rename to packages/p2p-media-loader-core/src/requests/request.ts index f7ec5b7a..bfc14d4c 100644 --- a/packages/p2p-media-loader-core/src/request.ts +++ b/packages/p2p-media-loader-core/src/requests/request.ts @@ -1,15 +1,10 @@ -import { Segment, SegmentResponse, Playback } from "./types"; -import { BandwidthCalculator } from "./bandwidth-calculator"; -import * as StreamUtils from "./utils/stream"; -import * as Utils from "./utils/utils"; -import * as LoggerUtils from "./utils/logger"; +import { Segment, Playback } from "../types"; +import { BandwidthCalculator } from "../bandwidth-calculator"; +import * as StreamUtils from "../utils/stream"; +import * as Utils from "../utils/utils"; +import * as LoggerUtils from "../utils/logger"; import debug from "debug"; -export type EngineCallbacks = { - onSuccess: (response: SegmentResponse) => void; - onError: (reason: CoreRequestError) => void; -}; - export type LoadProgress = { startTimestamp: number; lastLoadedChunkTimestamp?: number; @@ -54,7 +49,6 @@ export type RequestStatus = export class Request { readonly id: string; - private _engineCallbacks?: EngineCallbacks; private currentAttempt?: RequestAttempt; private _failedAttempts = new FailedRequestAttempts(); private finalData?: ArrayBuffer; @@ -104,10 +98,6 @@ export class Request { this._isHandledByProcessQueue = false; } - get isSegmentRequestedByEngine(): boolean { - return !!this._engineCallbacks; - } - get type() { return this.currentAttempt?.type; } @@ -138,15 +128,6 @@ export class Request { this._isHandledByProcessQueue = true; } - setEngineCallbacks(callbacks: EngineCallbacks) { - if (this._engineCallbacks) { - throw new Error("Segment is already requested by engine"); - } - this.failedAttempts.clear(); - this._isHandledByProcessQueue = false; - this._engineCallbacks = callbacks; - } - setTotalBytes(value: number) { if (this._totalBytes !== undefined) { throw new Error("Request total bytes value is already set"); @@ -205,25 +186,6 @@ export class Request { }; } - resolveEngineCallbacksSuccessfully() { - if (!this.finalData) return; - const bandwidth = this.bandwidthCalculator.getBandwidthForLastNSeconds(3); - this._engineCallbacks?.onSuccess({ data: this.finalData, bandwidth }); - this._engineCallbacks = undefined; - } - - resolveEngineCallbacksWithError() { - this._engineCallbacks?.onError(new CoreRequestError("failed")); - this._engineCallbacks = undefined; - } - - abortFromEngine() { - if (this._status !== "loading") return; - this._engineCallbacks?.onError(new CoreRequestError("aborted")); - this._engineCallbacks = undefined; - this.requestProcessQueueCallback(); - } - abortFromProcessQueue() { this.throwErrorIfNotLoadingStatus(); this.setStatus("aborted"); @@ -380,12 +342,6 @@ export class RequestError< } } -export class CoreRequestError extends Error { - constructor(readonly type: "failed" | "aborted") { - super(); - } -} - export class Timeout { private timeoutId?: number; private ms?: number; diff --git a/packages/p2p-media-loader-core/src/types.d.ts b/packages/p2p-media-loader-core/src/types.d.ts index 78d36e21..1f00f738 100644 --- a/packages/p2p-media-loader-core/src/types.d.ts +++ b/packages/p2p-media-loader-core/src/types.d.ts @@ -1,5 +1,5 @@ import { LinkedMap } from "./linked-map"; -import { RequestAttempt } from "./request"; +import { RequestAttempt } from "./requests/request"; export type StreamType = "main" | "secondary";