From 2bb356e46f4c89c964630a5523341534128a5e48 Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Wed, 20 Sep 2023 15:56:32 +0300 Subject: [PATCH] Create hybrid loaders when necessary data is already set. --- packages/p2p-media-loader-core/src/core.ts | 46 +++++++++++-------- .../src/hybrid-loader.ts | 31 +++++-------- packages/p2p-media-loader-core/src/request.ts | 16 +++++-- .../src/segments-storage.ts | 37 +++++++++------ packages/p2p-media-loader-core/src/types.ts | 2 +- packages/p2p-media-loader-core/src/utils.ts | 4 +- 6 files changed, 76 insertions(+), 60 deletions(-) diff --git a/packages/p2p-media-loader-core/src/core.ts b/packages/p2p-media-loader-core/src/core.ts index dc2cab94..2c8c2fe8 100644 --- a/packages/p2p-media-loader-core/src/core.ts +++ b/packages/p2p-media-loader-core/src/core.ts @@ -17,16 +17,11 @@ export class Core { cachedSegmentsCount: 50, }; private readonly bandwidthApproximator = new BandwidthApproximator(); - private readonly mainStreamLoader: HybridLoader = new HybridLoader( - this.settings, - this.bandwidthApproximator - ); + private mainStreamLoader?: HybridLoader; private secondaryStreamLoader?: HybridLoader; setManifestResponseUrl(url: string): void { this.manifestResponseUrl = url.split("?")[0]; - this.mainStreamLoader.setStreamManifestUrl(this.manifestResponseUrl); - this.secondaryStreamLoader?.setStreamManifestUrl(this.manifestResponseUrl); } hasSegment(segmentLocalId: string): boolean { @@ -61,32 +56,23 @@ export class Core { loadSegment(segmentLocalId: string, callbacks: EngineCallbacks) { const { segment, stream } = this.identifySegment(segmentLocalId); - - let loader: HybridLoader; - if (stream.type === "main") { - loader = this.mainStreamLoader; - } else { - this.secondaryStreamLoader = - this.secondaryStreamLoader ?? - new HybridLoader(this.settings, this.bandwidthApproximator); - loader = this.secondaryStreamLoader; - } + const loader = this.getStreamHybridLoader(segment, stream); void loader.loadSegment(segment, stream, callbacks); } abortSegmentLoading(segmentId: string): void { - this.mainStreamLoader.abortSegment(segmentId); + this.mainStreamLoader?.abortSegment(segmentId); this.secondaryStreamLoader?.abortSegment(segmentId); } updatePlayback(position: number, rate: number): void { - this.mainStreamLoader.updatePlayback(position, rate); + this.mainStreamLoader?.updatePlayback(position, rate); this.secondaryStreamLoader?.updatePlayback(position, rate); } destroy(): void { this.streams.clear(); - this.mainStreamLoader.destroy(); + this.mainStreamLoader?.destroy(); this.secondaryStreamLoader?.destroy(); this.manifestResponseUrl = undefined; } @@ -104,4 +90,26 @@ export class Core { return { segment, stream }; } + + private getStreamHybridLoader(segment: Segment, stream: StreamWithSegments) { + if (!this.manifestResponseUrl) { + throw new Error("Manifest response url is not defined"); + } + const streamTypeLoaderKeyMap = { + main: "mainStreamLoader", + secondary: "secondaryStreamLoader", + } as const; + const { type } = stream; + const loaderKey = streamTypeLoaderKeyMap[type]; + + return (this[loaderKey] = + this[loaderKey] ?? + new HybridLoader( + this.manifestResponseUrl, + segment, + stream, + this.settings, + this.bandwidthApproximator + )); + } } diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index c57f22f8..7a9dc4a6 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -10,23 +10,28 @@ import * as Utils from "./utils"; import { FetchError } from "./errors"; export class HybridLoader { - private streamManifestUrl?: string; private readonly requests = new RequestContainer(); private p2pLoader?: P2PLoader; private readonly segmentStorage: SegmentsMemoryStorage; private storageCleanUpIntervalId?: number; - private activeStream?: Readonly; - private lastRequestedSegment?: Readonly; - private playback?: Playback; + private activeStream: Readonly; + private lastRequestedSegment: Readonly; + private readonly playback: Playback; private lastQueueProcessingTimeStamp?: number; constructor( + private streamManifestUrl: string, + requestedSegment: Segment, + requestedStream: Readonly, private readonly settings: Settings, private readonly bandwidthApproximator: BandwidthApproximator ) { + this.lastRequestedSegment = requestedSegment; + this.activeStream = requestedStream; + this.playback = { position: requestedSegment.startTime, rate: 1 }; this.segmentStorage = new SegmentsMemoryStorage(this.settings); this.segmentStorage.setIsSegmentLockedPredicate((segment) => { - if (!this.playback || !this.activeStream?.segments.has(segment.localId)) { + if (!this.activeStream.segments.has(segment.localId)) { return false; } const bufferRanges = Utils.getLoadBufferRanges( @@ -42,12 +47,7 @@ export class HybridLoader { ); } - setStreamManifestUrl(url: string) { - this.streamManifestUrl = url; - } - private createP2PLoader(stream: StreamWithSegments) { - if (!this.streamManifestUrl) return; this.p2pLoader = new P2PLoader( this.streamManifestUrl, stream, @@ -62,9 +62,6 @@ export class HybridLoader { stream: Readonly, callbacks: EngineCallbacks ) { - if (!this.playback) { - this.playback = { position: segment.startTime, rate: 1 }; - } if (stream !== this.activeStream) { this.activeStream = stream; this.createP2PLoader(stream); @@ -85,9 +82,6 @@ export class HybridLoader { } private async processQueue(force = true) { - if (!this.activeStream || !this.lastRequestedSegment || !this.playback) { - return; - } const now = performance.now(); if ( !force && @@ -98,13 +92,12 @@ export class HybridLoader { } this.lastQueueProcessingTimeStamp = now; - const storedSegmentIds = await this.segmentStorage.getStoredSegmentIds(); const { queue, queueSegmentIds } = Utils.generateQueue({ segment: this.lastRequestedSegment, stream: this.activeStream, playback: this.playback, settings: this.settings, - isSegmentLoaded: (segmentId) => storedSegmentIds.has(segmentId), + isSegmentLoaded: (segmentId) => this.segmentStorage.hasSegment(segmentId), }); this.requests.abortAllNotRequestedByEngine((segmentId) => @@ -175,7 +168,6 @@ export class HybridLoader { } updatePlayback(position: number, rate: number) { - if (!this.playback) return; const isRateChanged = this.playback.rate !== rate; const isPositionChanged = this.playback.position !== position; @@ -191,7 +183,6 @@ export class HybridLoader { this.storageCleanUpIntervalId = undefined; void this.segmentStorage.destroy(); this.requests.destroy(); - this.playback = undefined; } } diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts index 75415ea8..d2ca3492 100644 --- a/packages/p2p-media-loader-core/src/request.ts +++ b/packages/p2p-media-loader-core/src/request.ts @@ -27,11 +27,15 @@ type Request = { engineCallbacks?: Readonly; }; +function getRequestItemId(segment: Segment) { + return segment.localId; +} + export class RequestContainer { private readonly requests = new Map(); addLoaderRequest(segment: Segment, loaderRequest: HybridLoaderRequest) { - const segmentId = segment.localId; + const segmentId = getRequestItemId(segment); const existingRequest = this.requests.get(segmentId); if (existingRequest) { existingRequest.loaderRequest = loaderRequest; @@ -41,13 +45,13 @@ export class RequestContainer { loaderRequest, }); } - loaderRequest.promise.finally(() => + loaderRequest.promise.then(() => this.clearRequestItem(segmentId, "loader") ); } addEngineCallbacks(segment: Segment, engineCallbacks: EngineCallbacks) { - const segmentId = segment.localId; + const segmentId = getRequestItemId(segment); const requestItem = this.requests.get(segmentId); if (requestItem) { requestItem.engineCallbacks = engineCallbacks; @@ -125,7 +129,8 @@ export class RequestContainer { if (type === "engine") delete requestItem.engineCallbacks; if (type === "loader") delete requestItem.loaderRequest; if (!requestItem.engineCallbacks && !requestItem.loaderRequest) { - this.requests.delete(requestItem.segment.localId); + const segmentId = getRequestItemId(requestItem.segment); + this.requests.delete(segmentId); } } @@ -136,7 +141,8 @@ export class RequestContainer { segment, } of this.requests.values()) { if (!engineCallbacks) continue; - if (!isLocked(segment.localId) && loaderRequest) loaderRequest.abort(); + const segmentId = getRequestItemId(segment); + if (!isLocked(segmentId) && loaderRequest) loaderRequest.abort(); } } diff --git a/packages/p2p-media-loader-core/src/segments-storage.ts b/packages/p2p-media-loader-core/src/segments-storage.ts index 48bb27a5..8dec3af3 100644 --- a/packages/p2p-media-loader-core/src/segments-storage.ts +++ b/packages/p2p-media-loader-core/src/segments-storage.ts @@ -5,6 +5,7 @@ export class SegmentsMemoryStorage { string, { segment: Segment; data: ArrayBuffer; lastAccessed: number } >(); + private readonly cachedSegmentIds = new Set(); private isSegmentLockedPredicate?: (segment: Segment) => boolean; private onUpdateSubscriptions: (() => void)[] = []; @@ -15,6 +16,10 @@ export class SegmentsMemoryStorage { } ) {} + async initialize(masterManifestUrl: string) { + // empty + } + setIsSegmentLockedPredicate(predicate: (segment: Segment) => boolean) { this.isSegmentLockedPredicate = predicate; } @@ -24,28 +29,28 @@ export class SegmentsMemoryStorage { } async storeSegment(segment: Segment, data: ArrayBuffer) { - this.cache.set(segment.localId, { + const id = segment.externalId; + this.cache.set(id, { segment, data, lastAccessed: performance.now(), }); + this.cachedSegmentIds.add(id); this.onUpdateSubscriptions.forEach((c) => c()); } - async getSegmentData(segmentId: string): Promise { - const cacheItem = this.cache.get(segmentId); + async getSegmentData( + segmentExternalId: string + ): Promise { + const cacheItem = this.cache.get(segmentExternalId); if (cacheItem === undefined) return undefined; cacheItem.lastAccessed = performance.now(); return cacheItem.data; } - async getStoredSegmentIds() { - const segmentIds = new Set(); - for (const segmentId of this.cache.keys()) { - segmentIds.add(segmentId); - } - return segmentIds; + hasSegment(segmentExternalId: string): boolean { + return this.cachedSegmentIds.has(segmentExternalId); } async clear(): Promise { @@ -58,10 +63,13 @@ export class SegmentsMemoryStorage { // Delete old segments const now = performance.now(); - for (const [segmentId, { lastAccessed, segment }] of this.cache.entries()) { + for (const [ + segmentExternalId, + { lastAccessed, segment }, + ] of this.cache.entries()) { if (now - lastAccessed > this.settings.cachedSegmentExpiration) { if (!this.isSegmentLockedPredicate?.(segment)) { - segmentsToDelete.push(segmentId); + segmentsToDelete.push(segmentExternalId); } } else { remainingSegments.push({ segment, lastAccessed }); @@ -76,14 +84,17 @@ export class SegmentsMemoryStorage { for (const cachedSegment of remainingSegments) { if (!this.isSegmentLockedPredicate?.(cachedSegment.segment)) { - segmentsToDelete.push(cachedSegment.segment.localId); + segmentsToDelete.push(cachedSegment.segment.externalId); countOverhead--; if (countOverhead === 0) break; } } } - segmentsToDelete.forEach((id) => this.cache.delete(id)); + segmentsToDelete.forEach((id) => { + this.cache.delete(id); + this.cachedSegmentIds.delete(id); + }); if (segmentsToDelete.length) { this.onUpdateSubscriptions.forEach((c) => c()); } diff --git a/packages/p2p-media-loader-core/src/types.ts b/packages/p2p-media-loader-core/src/types.ts index e8dd2f9c..f9cd65f0 100644 --- a/packages/p2p-media-loader-core/src/types.ts +++ b/packages/p2p-media-loader-core/src/types.ts @@ -8,7 +8,7 @@ export type ByteRange = { start: number; end: number }; export type Segment = { readonly localId: string; - readonly externalId: number; + readonly externalId: string; readonly url: string; readonly byteRange?: ByteRange; readonly startTime: number; diff --git a/packages/p2p-media-loader-core/src/utils.ts b/packages/p2p-media-loader-core/src/utils.ts index 4cc59206..d5851e4a 100644 --- a/packages/p2p-media-loader-core/src/utils.ts +++ b/packages/p2p-media-loader-core/src/utils.ts @@ -42,7 +42,7 @@ export function generateQueue({ stream: Readonly; segment: Readonly; playback: Readonly; - isSegmentLoaded: (segmentId: string) => boolean; + isSegmentLoaded: (segmentExternalId: string) => boolean; settings: Pick< Settings, "highDemandTimeWindow" | "httpDownloadTimeWindow" | "p2pDownloadTimeWindow" @@ -64,7 +64,7 @@ export function generateQueue({ for (const segment of stream.segments.values(requestedSegmentId)) { const statuses = getSegmentLoadStatuses(segment, bufferRanges); if (!statuses && !(i === 0 && isNextSegmentHighDemand)) break; - if (isSegmentLoaded(segment.localId)) continue; + if (isSegmentLoaded(segment.externalId)) continue; queueSegmentIds.add(segment.localId); statuses.isHighDemand = true;