Skip to content

Commit

Permalink
Revise queue generation algorithm.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Jan 5, 2024
1 parent 21866b2 commit b2f7215
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 66 deletions.
10 changes: 7 additions & 3 deletions p2p-media-loader-demo/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,20 @@ function App() {

const initHlsDPlayer = (url: string) => {
if (!hlsEngine.current) return;
const engine = hlsEngine.current!;
const engine = hlsEngine.current;
const player = new DPlayer({
container: containerRef.current,
video: {
url,
type: "customHls",
customType: {
customHls: (video: HTMLVideoElement) => {
const hls = new window.Hls(engine.getConfig());
engine.setHls(hls);
const hls = new window.Hls({
// ...engine.getConfig(),
maxBufferLength: 20,
maxBufferSize: 0.05 * 1000000,
});
// engine.setHls(hls);
hls.loadSource(video.src);
hls.attachMedia(video);
hlsInstance.current = hls;
Expand Down
20 changes: 16 additions & 4 deletions packages/p2p-media-loader-core/src/bandwidth-calculator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ export class BandwidthCalculator {
this.allLoadingsStoppedTimestamp = now;
}

getBandwidthForLastNSplicedSeconds(seconds: number) {
getBandwidthForLastNSplicedSeconds(
seconds: number,
ignoreThresholdTimestamp = Number.NEGATIVE_INFINITY
) {
if (!this.shiftedTimestamps.length) return 0;
const milliseconds = seconds * 1000;
const lastItemTimestamp =
Expand All @@ -41,15 +44,24 @@ export class BandwidthCalculator {

for (let i = this.bytes.length - 1; i >= 0; i--) {
const timestamp = this.shiftedTimestamps[i];
if (timestamp < threshold) break;
if (
timestamp < threshold ||
this.timestamps[i] < ignoreThresholdTimestamp
) {
break;
}
lastCountedTimestamp = timestamp;
totalBytes += this.bytes[i];
}

return (totalBytes * 8000) / (lastItemTimestamp - lastCountedTimestamp);
}

getBandwidthForLastNSeconds(seconds: number, now = performance.now()) {
getBandwidthForLastNSeconds(
seconds: number,
ignoreThresholdTimestamp = Number.NEGATIVE_INFINITY,
now = performance.now()
) {
if (!this.timestamps.length) return 0;
const milliseconds = seconds * 1000;
const threshold = now - milliseconds;
Expand All @@ -58,7 +70,7 @@ export class BandwidthCalculator {

for (let i = this.bytes.length - 1; i >= 0; i--) {
const timestamp = this.timestamps[i];
if (timestamp < threshold) break;
if (timestamp < threshold || timestamp < ignoreThresholdTimestamp) break;
lastCountedTimestamp = timestamp;
totalBytes += this.bytes[i];
}
Expand Down
17 changes: 13 additions & 4 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
SegmentBase,
CoreEventHandlers,
BandwidthCalculators,
StreamDetails,
} from "./types";
import * as StreamUtils from "./utils/stream";
import { LinkedMap } from "./linked-map";
Expand Down Expand Up @@ -39,8 +40,10 @@ export class Core<TStream extends Stream = Stream> {
private segmentStorage?: SegmentsMemoryStorage;
private mainStreamLoader?: HybridLoader;
private secondaryStreamLoader?: HybridLoader;
private activeLevelBitrate?: number;
private isLive = false;
private streamDetails: StreamDetails = {
isLive: false,
activeLevelBitrate: 0,
};

constructor(private readonly eventHandlers?: CoreEventHandlers) {}

Expand Down Expand Up @@ -113,11 +116,15 @@ export class Core<TStream extends Stream = Stream> {
}

setActiveLevelBitrate(bitrate: number) {
this.activeLevelBitrate = bitrate;
if (bitrate !== this.streamDetails.activeLevelBitrate) {
this.streamDetails.activeLevelBitrate = bitrate;
this.mainStreamLoader?.notifyLevelChanged();
this.secondaryStreamLoader?.notifyLevelChanged();
}
}

setIsLive(isLive: boolean) {
this.isLive = isLive;
this.streamDetails.isLive = isLive;
}

destroy(): void {
Expand All @@ -129,6 +136,7 @@ export class Core<TStream extends Stream = Stream> {
this.secondaryStreamLoader = undefined;
this.segmentStorage = undefined;
this.manifestResponseUrl = undefined;
this.streamDetails = { isLive: false, activeLevelBitrate: 0 };
}

private identifySegment(segmentId: string): Segment {
Expand Down Expand Up @@ -158,6 +166,7 @@ export class Core<TStream extends Stream = Stream> {
return new HybridLoader(
manifestResponseUrl,
segment,
this.streamDetails as Required<StreamDetails>,
this.settings,
this.bandwidthCalculators,
this.segmentStorage,
Expand Down
85 changes: 71 additions & 14 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
CoreEventHandlers,
Playback,
BandwidthCalculators,
StreamDetails,
} from "./types";
import { P2PLoadersContainer } from "./p2p/loaders-container";
import { RequestsContainer } from "./requests/request-container";
Expand All @@ -26,23 +27,23 @@ export class HybridLoader {
private readonly playback: Playback;
private readonly segmentAvgDuration: number;
private readonly logger: debug.Debugger;
private lastRequestedSegment: Readonly<Segment>;
private storageCleanUpIntervalId?: number;
private levelChangedTimestamp?: number;
private lastQueueProcessingTimeStamp?: number;
private randomHttpDownloadInterval?: number;
private isProcessQueueMicrotaskCreated = false;

constructor(
private streamManifestUrl: string,
requestedSegment: Segment,
private lastRequestedSegment: Readonly<Segment>,
private readonly streamDetails: Required<Readonly<StreamDetails>>,
private readonly settings: Settings,
private readonly bandwidthCalculators: BandwidthCalculators,
private readonly segmentStorage: SegmentsMemoryStorage,
private readonly eventHandlers?: Pick<CoreEventHandlers, "onSegmentLoaded">
) {
this.lastRequestedSegment = requestedSegment;
const activeStream = requestedSegment.stream;
this.playback = { position: requestedSegment.startTime, rate: 1 };
const activeStream = this.lastRequestedSegment.stream;
this.playback = { position: this.lastRequestedSegment.startTime, rate: 1 };
this.segmentAvgDuration = StreamUtils.getSegmentAvgDuration(activeStream);
this.requests = new RequestsContainer(
this.requestProcessQueueMicrotask,
Expand All @@ -64,7 +65,7 @@ export class HybridLoader {
});
this.p2pLoaders = new P2PLoadersContainer(
this.streamManifestUrl,
requestedSegment.stream,
this.lastRequestedSegment.stream,
this.requests,
this.segmentStorage,
this.settings
Expand Down Expand Up @@ -132,7 +133,10 @@ export class HybridLoader {
});
};

private processRequests(queueSegmentIds: Set<string>) {
private processRequests(
queueSegmentIds: Set<string>,
loadedQueuePercent: number
) {
const { stream } = this.lastRequestedSegment;
const { httpErrorRetries } = this.settings;
const now = performance.now();
Expand All @@ -155,7 +159,7 @@ export class HybridLoader {
}
engineRequest?.resolve(
request.data,
this.bandwidthCalculators.all.getBandwidthForLastNSeconds(3)
this.getBandwidth(loadedQueuePercent)
);
this.engineRequests.delete(segment);
this.requests.remove(request);
Expand Down Expand Up @@ -200,15 +204,33 @@ export class HybridLoader {
}

private processQueue() {
const { queue, queueSegmentIds } = this.generateQueue();
this.processRequests(queueSegmentIds);
const { queue, queueSegmentIds, loadedPercent } = this.generateQueue();
this.processRequests(queueSegmentIds, loadedPercent);

console.log(queue.map((i) => i.segment.externalId));

const {
simultaneousHttpDownloads,
simultaneousP2PDownloads,
httpErrorRetries,
} = this.settings;

// for (const engineRequest of this.engineRequests.values()) {
// if (this.requests.executingHttpCount >= simultaneousHttpDownloads) break;
// const request = this.requests.get(engineRequest.segment);
// if (
// !queueSegmentIds.has(engineRequest.segment.localId) &&
// engineRequest.status === "pending" &&
// (!request ||
// request.status === "not-started" ||
// (request.status === "failed" &&
// request.failedAttempts.httpAttemptsCount <
// this.settings.httpErrorRetries))
// ) {
// void this.loadThroughHttp(engineRequest.segment);
// }
// }

for (const item of queue) {
const { statuses, segment } = item;
const request = this.requests.get(segment);
Expand Down Expand Up @@ -254,7 +276,6 @@ export class HybridLoader {
) {
void this.loadThroughP2P(segment);
}
break;
}
if (statuses.isP2PDownloadable) {
if (request?.status === "loading") continue;
Expand All @@ -264,6 +285,7 @@ export class HybridLoader {
}

if (
this.p2pLoaders.currentLoader.isSegmentLoadedBySomeone(segment) &&
this.abortLastP2PLoadingInQueueAfterItem(queue, segment) &&
this.requests.executingP2PCount < simultaneousP2PDownloads
) {
Expand Down Expand Up @@ -319,15 +341,16 @@ export class HybridLoader {
)) {
if (
!statuses.isHttpDownloadable ||
p2pLoader.isLoadingOrLoadedBySomeone(segment) ||
p2pLoader.isSegmentLoadingOrLoadedBySomeone(segment) ||
this.segmentStorage.hasSegment(segment)
) {
continue;
}
const request = this.requests.get(segment);
if (
request &&
(request.status === "succeed" ||
(request.status === "loading" ||
request.status === "succeed" ||
(request.failedAttempts.httpAttemptsCount ?? 0) >= httpErrorRetries)
) {
continue;
Expand Down Expand Up @@ -403,10 +426,44 @@ export class HybridLoader {
queueSegmentIds,
maxPossibleLength,
alreadyLoadedAmount,
loadedPercent: (alreadyLoadedAmount / maxPossibleLength) * 100,
loadedPercent:
maxPossibleLength !== 0
? (alreadyLoadedAmount / maxPossibleLength) * 100
: 0,
};
}

private getBandwidth(loadedPercentOfQueue: number) {
const { http, all } = this.bandwidthCalculators;
const { activeLevelBitrate } = this.streamDetails;
// console.log("activeLevelBitrate", Math.trunc(activeLevelBitrate / 1000));
if (this.streamDetails.activeLevelBitrate === 0) {
return all.getBandwidthForLastNSplicedSeconds(3);
}
const { levelChangedTimestamp } = this;

const bandwidth = all.getBandwidthForLastNSeconds(
30,
levelChangedTimestamp
);
const realBandwidth = all.getBandwidthForLastNSplicedSeconds(3);
if (loadedPercentOfQueue >= 80 || bandwidth >= activeLevelBitrate * 0.9) {
// console.log("realBandwidth", Math.trunc(realBandwidth / 1000));
// console.log("bandwidth", Math.trunc(bandwidth / 1000));
// console.log("percent", loadedPercentOfQueue);

return realBandwidth;
}
// console.log("bandwidth", loadedPercentOfQueue);
const httpRealBandwidth = http.getBandwidthForLastNSplicedSeconds(3);
return Math.max(bandwidth, httpRealBandwidth);
}

notifyLevelChanged() {
this.levelChangedTimestamp = performance.now();
console.log("LEVEL CHANGED");
}

updatePlayback(position: number, rate: number) {
const isRateChanged = this.playback.rate !== rate;
const isPositionChanged = this.playback.position !== position;
Expand Down
4 changes: 4 additions & 0 deletions packages/p2p-media-loader-core/src/linked-map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@ export class LinkedMap<K, V extends object> {
getNextTo(key: K): [K, V] | undefined {
return this.map.get(key)?.next?.item;
}

getPrevTo(key: K): [K, V] | undefined {
return this.map.get(key)?.prev?.item;
}
}
9 changes: 8 additions & 1 deletion packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,20 @@ export class P2PLoader {
peer.downloadSegment(request);
}

isLoadingOrLoadedBySomeone(segment: Segment): boolean {
isSegmentLoadingOrLoadedBySomeone(segment: Segment): boolean {
for (const peer of this.trackerClient.peers()) {
if (peer.getSegmentStatus(segment)) return true;
}
return false;
}

isSegmentLoadedBySomeone(segment: Segment): boolean {
for (const peer of this.trackerClient.peers()) {
if (peer.getSegmentStatus(segment) === "loaded") return true;
}
return false;
}

get connectedPeersAmount() {
let count = 0;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand Down
5 changes: 5 additions & 0 deletions packages/p2p-media-loader-core/src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,8 @@ export type BandwidthCalculators = Readonly<{
all: BandwidthCalculator;
http: BandwidthCalculator;
}>;

export type StreamDetails = {
isLive: boolean;
activeLevelBitrate: number;
};
27 changes: 23 additions & 4 deletions packages/p2p-media-loader-core/src/utils/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,20 @@ export function* generateQueue(
if (!first) return;

const firstStatuses = getSegmentPlaybackStatuses(first, playback, settings);
// console.log("firstStatuses", firstStatuses, lastRequestedSegment.externalId);
if (isNotActualStatuses(firstStatuses)) {
firstStatuses.isHighDemand = true;
yield { segment: first, statuses: firstStatuses };

let isFirstYield = false;
const prev = stream.segments.getPrevTo(requestedSegmentId)?.[1];
if (prev) {
const prevStatuses = getSegmentPlaybackStatuses(prev, playback, settings);
// console.log(prevStatuses);
if (isNotActualStatuses(prevStatuses)) {
// console.log(prevStatuses);
firstStatuses.isHighDemand = true;
yield { segment: first, statuses: firstStatuses };
isFirstYield = true;
}
}
// for cases when engine requests segment that is a little bit
// earlier than current playhead position
// it could happen when playhead position is significantly changed by user
Expand All @@ -33,7 +43,12 @@ export function* generateQueue(
playback,
settings
);

if (isNotActualStatuses(secondStatuses)) return;
if (!isFirstYield) {
firstStatuses.isHighDemand = true;
yield { segment: first, statuses: firstStatuses };
}
yield { segment: second, statuses: secondStatuses };
} else {
yield { segment: first, statuses: firstStatuses };
Expand All @@ -47,6 +62,10 @@ export function* generateQueue(
}

function isNotActualStatuses(statuses: SegmentPlaybackStatuses) {
const { isHighDemand, isHttpDownloadable, isP2PDownloadable } = statuses;
const {
isHighDemand = false,
isHttpDownloadable = false,
isP2PDownloadable = false,
} = statuses;
return !isHighDemand && !isHttpDownloadable && !isP2PDownloadable;
}
Loading

0 comments on commit b2f7215

Please sign in to comment.