From b2f7215dd7ae3c8edee5e073393785ba1a51321d Mon Sep 17 00:00:00 2001 From: igor Date: Fri, 5 Jan 2024 18:25:26 +0200 Subject: [PATCH] Revise queue generation algorithm. --- p2p-media-loader-demo/src/App.tsx | 10 ++- .../src/bandwidth-calculator.ts | 20 ++++- packages/p2p-media-loader-core/src/core.ts | 17 +++- .../src/hybrid-loader.ts | 85 ++++++++++++++++--- .../p2p-media-loader-core/src/linked-map.ts | 4 + .../p2p-media-loader-core/src/p2p/loader.ts | 9 +- packages/p2p-media-loader-core/src/types.d.ts | 5 ++ .../p2p-media-loader-core/src/utils/queue.ts | 27 +++++- .../p2p-media-loader-core/src/utils/stream.ts | 53 +++++------- packages/p2p-media-loader-hlsjs/src/engine.ts | 2 +- .../src/manifest-parser-decorator.ts | 13 ++- 11 files changed, 179 insertions(+), 66 deletions(-) diff --git a/p2p-media-loader-demo/src/App.tsx b/p2p-media-loader-demo/src/App.tsx index e3c6b214..270a9295 100644 --- a/p2p-media-loader-demo/src/App.tsx +++ b/p2p-media-loader-demo/src/App.tsx @@ -133,7 +133,7 @@ function App() { const initHlsDPlayer = (url: string) => { if (!hlsEngine.current) return; - const engine = hlsEngine.current!; + const engine = hlsEngine.current; const player = new DPlayer({ container: containerRef.current, video: { @@ -141,8 +141,12 @@ function App() { type: "customHls", customType: { customHls: (video: HTMLVideoElement) => { - const hls = new window.Hls(engine.getConfig()); - engine.setHls(hls); + const hls = new window.Hls({ + // ...engine.getConfig(), + maxBufferLength: 20, + maxBufferSize: 0.05 * 1000000, + }); + // engine.setHls(hls); hls.loadSource(video.src); hls.attachMedia(video); hlsInstance.current = hls; diff --git a/packages/p2p-media-loader-core/src/bandwidth-calculator.ts b/packages/p2p-media-loader-core/src/bandwidth-calculator.ts index 86bd3ca2..73c8cbd7 100644 --- a/packages/p2p-media-loader-core/src/bandwidth-calculator.ts +++ b/packages/p2p-media-loader-core/src/bandwidth-calculator.ts @@ -30,7 +30,10 @@ export class BandwidthCalculator { this.allLoadingsStoppedTimestamp = now; } - getBandwidthForLastNSplicedSeconds(seconds: number) { + getBandwidthForLastNSplicedSeconds( + seconds: number, + ignoreThresholdTimestamp = Number.NEGATIVE_INFINITY + ) { if (!this.shiftedTimestamps.length) return 0; const milliseconds = seconds * 1000; const lastItemTimestamp = @@ -41,7 +44,12 @@ export class BandwidthCalculator { for (let i = this.bytes.length - 1; i >= 0; i--) { const timestamp = this.shiftedTimestamps[i]; - if (timestamp < threshold) break; + if ( + timestamp < threshold || + this.timestamps[i] < ignoreThresholdTimestamp + ) { + break; + } lastCountedTimestamp = timestamp; totalBytes += this.bytes[i]; } @@ -49,7 +57,11 @@ export class BandwidthCalculator { return (totalBytes * 8000) / (lastItemTimestamp - lastCountedTimestamp); } - getBandwidthForLastNSeconds(seconds: number, now = performance.now()) { + getBandwidthForLastNSeconds( + seconds: number, + ignoreThresholdTimestamp = Number.NEGATIVE_INFINITY, + now = performance.now() + ) { if (!this.timestamps.length) return 0; const milliseconds = seconds * 1000; const threshold = now - milliseconds; @@ -58,7 +70,7 @@ export class BandwidthCalculator { for (let i = this.bytes.length - 1; i >= 0; i--) { const timestamp = this.timestamps[i]; - if (timestamp < threshold) break; + if (timestamp < threshold || timestamp < ignoreThresholdTimestamp) break; lastCountedTimestamp = timestamp; totalBytes += this.bytes[i]; } diff --git a/packages/p2p-media-loader-core/src/core.ts b/packages/p2p-media-loader-core/src/core.ts index aa1741ce..1715ed87 100644 --- a/packages/p2p-media-loader-core/src/core.ts +++ b/packages/p2p-media-loader-core/src/core.ts @@ -7,6 +7,7 @@ import { SegmentBase, CoreEventHandlers, BandwidthCalculators, + StreamDetails, } from "./types"; import * as StreamUtils from "./utils/stream"; import { LinkedMap } from "./linked-map"; @@ -39,8 +40,10 @@ export class Core { private segmentStorage?: SegmentsMemoryStorage; private mainStreamLoader?: HybridLoader; private secondaryStreamLoader?: HybridLoader; - private activeLevelBitrate?: number; - private isLive = false; + private streamDetails: StreamDetails = { + isLive: false, + activeLevelBitrate: 0, + }; constructor(private readonly eventHandlers?: CoreEventHandlers) {} @@ -113,11 +116,15 @@ export class Core { } setActiveLevelBitrate(bitrate: number) { - this.activeLevelBitrate = bitrate; + if (bitrate !== this.streamDetails.activeLevelBitrate) { + this.streamDetails.activeLevelBitrate = bitrate; + this.mainStreamLoader?.notifyLevelChanged(); + this.secondaryStreamLoader?.notifyLevelChanged(); + } } setIsLive(isLive: boolean) { - this.isLive = isLive; + this.streamDetails.isLive = isLive; } destroy(): void { @@ -129,6 +136,7 @@ export class Core { this.secondaryStreamLoader = undefined; this.segmentStorage = undefined; this.manifestResponseUrl = undefined; + this.streamDetails = { isLive: false, activeLevelBitrate: 0 }; } private identifySegment(segmentId: string): Segment { @@ -158,6 +166,7 @@ export class Core { return new HybridLoader( manifestResponseUrl, segment, + this.streamDetails as Required, this.settings, this.bandwidthCalculators, this.segmentStorage, diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index 2191dc4a..512aba1a 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -6,6 +6,7 @@ import { CoreEventHandlers, Playback, BandwidthCalculators, + StreamDetails, } from "./types"; import { P2PLoadersContainer } from "./p2p/loaders-container"; import { RequestsContainer } from "./requests/request-container"; @@ -26,23 +27,23 @@ export class HybridLoader { private readonly playback: Playback; private readonly segmentAvgDuration: number; private readonly logger: debug.Debugger; - private lastRequestedSegment: Readonly; private storageCleanUpIntervalId?: number; + private levelChangedTimestamp?: number; private lastQueueProcessingTimeStamp?: number; private randomHttpDownloadInterval?: number; private isProcessQueueMicrotaskCreated = false; constructor( private streamManifestUrl: string, - requestedSegment: Segment, + private lastRequestedSegment: Readonly, + private readonly streamDetails: Required>, private readonly settings: Settings, private readonly bandwidthCalculators: BandwidthCalculators, private readonly segmentStorage: SegmentsMemoryStorage, private readonly eventHandlers?: Pick ) { - this.lastRequestedSegment = requestedSegment; - const activeStream = requestedSegment.stream; - this.playback = { position: requestedSegment.startTime, rate: 1 }; + const activeStream = this.lastRequestedSegment.stream; + this.playback = { position: this.lastRequestedSegment.startTime, rate: 1 }; this.segmentAvgDuration = StreamUtils.getSegmentAvgDuration(activeStream); this.requests = new RequestsContainer( this.requestProcessQueueMicrotask, @@ -64,7 +65,7 @@ export class HybridLoader { }); this.p2pLoaders = new P2PLoadersContainer( this.streamManifestUrl, - requestedSegment.stream, + this.lastRequestedSegment.stream, this.requests, this.segmentStorage, this.settings @@ -132,7 +133,10 @@ export class HybridLoader { }); }; - private processRequests(queueSegmentIds: Set) { + private processRequests( + queueSegmentIds: Set, + loadedQueuePercent: number + ) { const { stream } = this.lastRequestedSegment; const { httpErrorRetries } = this.settings; const now = performance.now(); @@ -155,7 +159,7 @@ export class HybridLoader { } engineRequest?.resolve( request.data, - this.bandwidthCalculators.all.getBandwidthForLastNSeconds(3) + this.getBandwidth(loadedQueuePercent) ); this.engineRequests.delete(segment); this.requests.remove(request); @@ -200,8 +204,10 @@ export class HybridLoader { } private processQueue() { - const { queue, queueSegmentIds } = this.generateQueue(); - this.processRequests(queueSegmentIds); + const { queue, queueSegmentIds, loadedPercent } = this.generateQueue(); + this.processRequests(queueSegmentIds, loadedPercent); + + console.log(queue.map((i) => i.segment.externalId)); const { simultaneousHttpDownloads, @@ -209,6 +215,22 @@ export class HybridLoader { httpErrorRetries, } = this.settings; + // for (const engineRequest of this.engineRequests.values()) { + // if (this.requests.executingHttpCount >= simultaneousHttpDownloads) break; + // const request = this.requests.get(engineRequest.segment); + // if ( + // !queueSegmentIds.has(engineRequest.segment.localId) && + // engineRequest.status === "pending" && + // (!request || + // request.status === "not-started" || + // (request.status === "failed" && + // request.failedAttempts.httpAttemptsCount < + // this.settings.httpErrorRetries)) + // ) { + // void this.loadThroughHttp(engineRequest.segment); + // } + // } + for (const item of queue) { const { statuses, segment } = item; const request = this.requests.get(segment); @@ -254,7 +276,6 @@ export class HybridLoader { ) { void this.loadThroughP2P(segment); } - break; } if (statuses.isP2PDownloadable) { if (request?.status === "loading") continue; @@ -264,6 +285,7 @@ export class HybridLoader { } if ( + this.p2pLoaders.currentLoader.isSegmentLoadedBySomeone(segment) && this.abortLastP2PLoadingInQueueAfterItem(queue, segment) && this.requests.executingP2PCount < simultaneousP2PDownloads ) { @@ -319,7 +341,7 @@ export class HybridLoader { )) { if ( !statuses.isHttpDownloadable || - p2pLoader.isLoadingOrLoadedBySomeone(segment) || + p2pLoader.isSegmentLoadingOrLoadedBySomeone(segment) || this.segmentStorage.hasSegment(segment) ) { continue; @@ -327,7 +349,8 @@ export class HybridLoader { const request = this.requests.get(segment); if ( request && - (request.status === "succeed" || + (request.status === "loading" || + request.status === "succeed" || (request.failedAttempts.httpAttemptsCount ?? 0) >= httpErrorRetries) ) { continue; @@ -403,10 +426,44 @@ export class HybridLoader { queueSegmentIds, maxPossibleLength, alreadyLoadedAmount, - loadedPercent: (alreadyLoadedAmount / maxPossibleLength) * 100, + loadedPercent: + maxPossibleLength !== 0 + ? (alreadyLoadedAmount / maxPossibleLength) * 100 + : 0, }; } + private getBandwidth(loadedPercentOfQueue: number) { + const { http, all } = this.bandwidthCalculators; + const { activeLevelBitrate } = this.streamDetails; + // console.log("activeLevelBitrate", Math.trunc(activeLevelBitrate / 1000)); + if (this.streamDetails.activeLevelBitrate === 0) { + return all.getBandwidthForLastNSplicedSeconds(3); + } + const { levelChangedTimestamp } = this; + + const bandwidth = all.getBandwidthForLastNSeconds( + 30, + levelChangedTimestamp + ); + const realBandwidth = all.getBandwidthForLastNSplicedSeconds(3); + if (loadedPercentOfQueue >= 80 || bandwidth >= activeLevelBitrate * 0.9) { + // console.log("realBandwidth", Math.trunc(realBandwidth / 1000)); + // console.log("bandwidth", Math.trunc(bandwidth / 1000)); + // console.log("percent", loadedPercentOfQueue); + + return realBandwidth; + } + // console.log("bandwidth", loadedPercentOfQueue); + const httpRealBandwidth = http.getBandwidthForLastNSplicedSeconds(3); + return Math.max(bandwidth, httpRealBandwidth); + } + + notifyLevelChanged() { + this.levelChangedTimestamp = performance.now(); + console.log("LEVEL CHANGED"); + } + updatePlayback(position: number, rate: number) { const isRateChanged = this.playback.rate !== rate; const isPositionChanged = this.playback.position !== position; diff --git a/packages/p2p-media-loader-core/src/linked-map.ts b/packages/p2p-media-loader-core/src/linked-map.ts index 78ee364d..f367dd11 100644 --- a/packages/p2p-media-loader-core/src/linked-map.ts +++ b/packages/p2p-media-loader-core/src/linked-map.ts @@ -75,4 +75,8 @@ export class LinkedMap { getNextTo(key: K): [K, V] | undefined { return this.map.get(key)?.next?.item; } + + getPrevTo(key: K): [K, V] | undefined { + return this.map.get(key)?.prev?.item; + } } diff --git a/packages/p2p-media-loader-core/src/p2p/loader.ts b/packages/p2p-media-loader-core/src/p2p/loader.ts index f08b01cf..92c9ab84 100644 --- a/packages/p2p-media-loader-core/src/p2p/loader.ts +++ b/packages/p2p-media-loader-core/src/p2p/loader.ts @@ -58,13 +58,20 @@ export class P2PLoader { peer.downloadSegment(request); } - isLoadingOrLoadedBySomeone(segment: Segment): boolean { + isSegmentLoadingOrLoadedBySomeone(segment: Segment): boolean { for (const peer of this.trackerClient.peers()) { if (peer.getSegmentStatus(segment)) return true; } return false; } + isSegmentLoadedBySomeone(segment: Segment): boolean { + for (const peer of this.trackerClient.peers()) { + if (peer.getSegmentStatus(segment) === "loaded") return true; + } + return false; + } + get connectedPeersAmount() { let count = 0; // eslint-disable-next-line @typescript-eslint/no-unused-vars diff --git a/packages/p2p-media-loader-core/src/types.d.ts b/packages/p2p-media-loader-core/src/types.d.ts index e152f92f..89eed0bd 100644 --- a/packages/p2p-media-loader-core/src/types.d.ts +++ b/packages/p2p-media-loader-core/src/types.d.ts @@ -77,3 +77,8 @@ export type BandwidthCalculators = Readonly<{ all: BandwidthCalculator; http: BandwidthCalculator; }>; + +export type StreamDetails = { + isLive: boolean; + activeLevelBitrate: number; +}; diff --git a/packages/p2p-media-loader-core/src/utils/queue.ts b/packages/p2p-media-loader-core/src/utils/queue.ts index 907e9724..ba7f9a3f 100644 --- a/packages/p2p-media-loader-core/src/utils/queue.ts +++ b/packages/p2p-media-loader-core/src/utils/queue.ts @@ -19,10 +19,20 @@ export function* generateQueue( if (!first) return; const firstStatuses = getSegmentPlaybackStatuses(first, playback, settings); + // console.log("firstStatuses", firstStatuses, lastRequestedSegment.externalId); if (isNotActualStatuses(firstStatuses)) { - firstStatuses.isHighDemand = true; - yield { segment: first, statuses: firstStatuses }; - + let isFirstYield = false; + const prev = stream.segments.getPrevTo(requestedSegmentId)?.[1]; + if (prev) { + const prevStatuses = getSegmentPlaybackStatuses(prev, playback, settings); + // console.log(prevStatuses); + if (isNotActualStatuses(prevStatuses)) { + // console.log(prevStatuses); + firstStatuses.isHighDemand = true; + yield { segment: first, statuses: firstStatuses }; + isFirstYield = true; + } + } // for cases when engine requests segment that is a little bit // earlier than current playhead position // it could happen when playhead position is significantly changed by user @@ -33,7 +43,12 @@ export function* generateQueue( playback, settings ); + if (isNotActualStatuses(secondStatuses)) return; + if (!isFirstYield) { + firstStatuses.isHighDemand = true; + yield { segment: first, statuses: firstStatuses }; + } yield { segment: second, statuses: secondStatuses }; } else { yield { segment: first, statuses: firstStatuses }; @@ -47,6 +62,10 @@ export function* generateQueue( } function isNotActualStatuses(statuses: SegmentPlaybackStatuses) { - const { isHighDemand, isHttpDownloadable, isP2PDownloadable } = statuses; + const { + isHighDemand = false, + isHttpDownloadable = false, + isP2PDownloadable = false, + } = statuses; return !isHighDemand && !isHttpDownloadable && !isP2PDownloadable; } diff --git a/packages/p2p-media-loader-core/src/utils/stream.ts b/packages/p2p-media-loader-core/src/utils/stream.ts index 2266c381..24c78768 100644 --- a/packages/p2p-media-loader-core/src/utils/stream.ts +++ b/packages/p2p-media-loader-core/src/utils/stream.ts @@ -7,9 +7,9 @@ import { } from "../types"; export type SegmentPlaybackStatuses = { - isHighDemand: boolean; - isHttpDownloadable: boolean; - isP2PDownloadable: boolean; + isHighDemand?: boolean; + isHttpDownloadable?: boolean; + isP2PDownloadable?: boolean; }; export type PlaybackTimeWindowsSettings = Pick< @@ -66,17 +66,13 @@ export function isSegmentActualInPlayback( segment: Readonly, playback: Playback, timeWindowsSettings: PlaybackTimeWindowsSettings -) { - const statuses = getSegmentPlaybackStatuses( - segment, - playback, - timeWindowsSettings - ); - return ( - statuses.isHighDemand || - statuses.isHttpDownloadable || - statuses.isP2PDownloadable - ); +): boolean { + const { + isHighDemand = false, + isHttpDownloadable = false, + isP2PDownloadable = false, + } = getSegmentPlaybackStatuses(segment, playback, timeWindowsSettings); + return isHighDemand || isHttpDownloadable || isP2PDownloadable; } export function getSegmentPlaybackStatuses( @@ -90,23 +86,18 @@ export function getSegmentPlaybackStatuses( p2pDownloadTimeWindow, } = timeWindowsSettings; - return { - isHighDemand: isSegmentInTimeWindow( - segment, - playback, - highDemandTimeWindow - ), - isHttpDownloadable: isSegmentInTimeWindow( - segment, - playback, - httpDownloadTimeWindow - ), - isP2PDownloadable: isSegmentInTimeWindow( - segment, - playback, - p2pDownloadTimeWindow - ), - }; + const statuses: SegmentPlaybackStatuses = {}; + if (isSegmentInTimeWindow(segment, playback, highDemandTimeWindow)) { + statuses.isHighDemand = true; + } + if (isSegmentInTimeWindow(segment, playback, httpDownloadTimeWindow)) { + statuses.isHttpDownloadable = true; + } + if (isSegmentInTimeWindow(segment, playback, p2pDownloadTimeWindow)) { + statuses.isP2PDownloadable = true; + } + + return statuses; } function isSegmentInTimeWindow( diff --git a/packages/p2p-media-loader-hlsjs/src/engine.ts b/packages/p2p-media-loader-hlsjs/src/engine.ts index db04f14f..7896c61e 100644 --- a/packages/p2p-media-loader-hlsjs/src/engine.ts +++ b/packages/p2p-media-loader-hlsjs/src/engine.ts @@ -102,7 +102,7 @@ export class Engine { private handleManifestLoaded = (event: string, data: ManifestLoadedData) => { const { networkDetails } = data; - console.log(data); + console.log(data.levels.map((i) => i.bitrate)); if (networkDetails instanceof XMLHttpRequest) { this.core.setManifestResponseUrl(networkDetails.responseURL); } else if (networkDetails instanceof Response) { diff --git a/packages/p2p-media-loader-shaka/src/manifest-parser-decorator.ts b/packages/p2p-media-loader-shaka/src/manifest-parser-decorator.ts index 2678bbd1..e921456f 100644 --- a/packages/p2p-media-loader-shaka/src/manifest-parser-decorator.ts +++ b/packages/p2p-media-loader-shaka/src/manifest-parser-decorator.ts @@ -110,7 +110,8 @@ export class ManifestParserDecorator implements shaka.extern.ManifestParser { if (!segmentManager) return; const substituteSegmentIndexGet = ( - segmentIndex: shaka.media.SegmentIndex + segmentIndex: shaka.media.SegmentIndex, + callFromCreateSegmentIndexMethod = false ) => { let prevReference: shaka.media.SegmentReference | null = null; let prevFirstItemReference: shaka.media.SegmentReference; @@ -150,8 +151,12 @@ export class ManifestParserDecorator implements shaka.extern.ManifestParser { // This catch is intentionally left blank. // [...segmentIndex] throws an error when segmentIndex inner array is empty } finally { - // do not read VOD segment index again if it has been already read - if (!stream.isSegmentIndexAlreadyRead || !!this.player?.isLive()) { + // do not set custom get again is segment index is already read and stream is VOD + if ( + !stream.isSegmentIndexAlreadyRead || + !!this.player?.isLive() || + !callFromCreateSegmentIndexMethod + ) { segmentIndex.get = customGet; } } @@ -170,7 +175,7 @@ export class ManifestParserDecorator implements shaka.extern.ManifestParser { stream.createSegmentIndex = async () => { const result = await createSegmentIndexOriginal.call(stream); if (!stream.segmentIndex) return result; - substituteSegmentIndexGet(stream.segmentIndex); + substituteSegmentIndexGet(stream.segmentIndex, true); return result; }; }