Skip to content

Commit

Permalink
Create hybrid loaders when necessary data is already set.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Sep 20, 2023
1 parent aa369ab commit 2bb356e
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 60 deletions.
46 changes: 27 additions & 19 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@ export class Core<TStream extends Stream = Stream> {
cachedSegmentsCount: 50,
};
private readonly bandwidthApproximator = new BandwidthApproximator();
private readonly mainStreamLoader: HybridLoader = new HybridLoader(
this.settings,
this.bandwidthApproximator
);
private mainStreamLoader?: HybridLoader;
private secondaryStreamLoader?: HybridLoader;

setManifestResponseUrl(url: string): void {
this.manifestResponseUrl = url.split("?")[0];
this.mainStreamLoader.setStreamManifestUrl(this.manifestResponseUrl);
this.secondaryStreamLoader?.setStreamManifestUrl(this.manifestResponseUrl);
}

hasSegment(segmentLocalId: string): boolean {
Expand Down Expand Up @@ -61,32 +56,23 @@ export class Core<TStream extends Stream = Stream> {

loadSegment(segmentLocalId: string, callbacks: EngineCallbacks) {
const { segment, stream } = this.identifySegment(segmentLocalId);

let loader: HybridLoader;
if (stream.type === "main") {
loader = this.mainStreamLoader;
} else {
this.secondaryStreamLoader =
this.secondaryStreamLoader ??
new HybridLoader(this.settings, this.bandwidthApproximator);
loader = this.secondaryStreamLoader;
}
const loader = this.getStreamHybridLoader(segment, stream);
void loader.loadSegment(segment, stream, callbacks);
}

abortSegmentLoading(segmentId: string): void {
this.mainStreamLoader.abortSegment(segmentId);
this.mainStreamLoader?.abortSegment(segmentId);
this.secondaryStreamLoader?.abortSegment(segmentId);
}

updatePlayback(position: number, rate: number): void {
this.mainStreamLoader.updatePlayback(position, rate);
this.mainStreamLoader?.updatePlayback(position, rate);
this.secondaryStreamLoader?.updatePlayback(position, rate);
}

destroy(): void {
this.streams.clear();
this.mainStreamLoader.destroy();
this.mainStreamLoader?.destroy();
this.secondaryStreamLoader?.destroy();
this.manifestResponseUrl = undefined;
}
Expand All @@ -104,4 +90,26 @@ export class Core<TStream extends Stream = Stream> {

return { segment, stream };
}

private getStreamHybridLoader(segment: Segment, stream: StreamWithSegments) {
if (!this.manifestResponseUrl) {
throw new Error("Manifest response url is not defined");
}
const streamTypeLoaderKeyMap = {
main: "mainStreamLoader",
secondary: "secondaryStreamLoader",
} as const;
const { type } = stream;
const loaderKey = streamTypeLoaderKeyMap[type];

return (this[loaderKey] =
this[loaderKey] ??
new HybridLoader(
this.manifestResponseUrl,
segment,
stream,
this.settings,
this.bandwidthApproximator
));
}
}
31 changes: 11 additions & 20 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,28 @@ import * as Utils from "./utils";
import { FetchError } from "./errors";

export class HybridLoader {
private streamManifestUrl?: string;
private readonly requests = new RequestContainer();
private p2pLoader?: P2PLoader;
private readonly segmentStorage: SegmentsMemoryStorage;
private storageCleanUpIntervalId?: number;
private activeStream?: Readonly<StreamWithSegments>;
private lastRequestedSegment?: Readonly<Segment>;
private playback?: Playback;
private activeStream: Readonly<StreamWithSegments>;
private lastRequestedSegment: Readonly<Segment>;
private readonly playback: Playback;
private lastQueueProcessingTimeStamp?: number;

constructor(
private streamManifestUrl: string,
requestedSegment: Segment,
requestedStream: Readonly<StreamWithSegments>,
private readonly settings: Settings,
private readonly bandwidthApproximator: BandwidthApproximator
) {
this.lastRequestedSegment = requestedSegment;
this.activeStream = requestedStream;
this.playback = { position: requestedSegment.startTime, rate: 1 };
this.segmentStorage = new SegmentsMemoryStorage(this.settings);
this.segmentStorage.setIsSegmentLockedPredicate((segment) => {
if (!this.playback || !this.activeStream?.segments.has(segment.localId)) {
if (!this.activeStream.segments.has(segment.localId)) {
return false;
}
const bufferRanges = Utils.getLoadBufferRanges(
Expand All @@ -42,12 +47,7 @@ export class HybridLoader {
);
}

setStreamManifestUrl(url: string) {
this.streamManifestUrl = url;
}

private createP2PLoader(stream: StreamWithSegments) {
if (!this.streamManifestUrl) return;
this.p2pLoader = new P2PLoader(
this.streamManifestUrl,
stream,
Expand All @@ -62,9 +62,6 @@ export class HybridLoader {
stream: Readonly<StreamWithSegments>,
callbacks: EngineCallbacks
) {
if (!this.playback) {
this.playback = { position: segment.startTime, rate: 1 };
}
if (stream !== this.activeStream) {
this.activeStream = stream;
this.createP2PLoader(stream);
Expand All @@ -85,9 +82,6 @@ export class HybridLoader {
}

private async processQueue(force = true) {
if (!this.activeStream || !this.lastRequestedSegment || !this.playback) {
return;
}
const now = performance.now();
if (
!force &&
Expand All @@ -98,13 +92,12 @@ export class HybridLoader {
}
this.lastQueueProcessingTimeStamp = now;

const storedSegmentIds = await this.segmentStorage.getStoredSegmentIds();
const { queue, queueSegmentIds } = Utils.generateQueue({
segment: this.lastRequestedSegment,
stream: this.activeStream,
playback: this.playback,
settings: this.settings,
isSegmentLoaded: (segmentId) => storedSegmentIds.has(segmentId),
isSegmentLoaded: (segmentId) => this.segmentStorage.hasSegment(segmentId),
});

this.requests.abortAllNotRequestedByEngine((segmentId) =>
Expand Down Expand Up @@ -175,7 +168,6 @@ export class HybridLoader {
}

updatePlayback(position: number, rate: number) {
if (!this.playback) return;
const isRateChanged = this.playback.rate !== rate;
const isPositionChanged = this.playback.position !== position;

Expand All @@ -191,7 +183,6 @@ export class HybridLoader {
this.storageCleanUpIntervalId = undefined;
void this.segmentStorage.destroy();
this.requests.destroy();
this.playback = undefined;
}
}

Expand Down
16 changes: 11 additions & 5 deletions packages/p2p-media-loader-core/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ type Request = {
engineCallbacks?: Readonly<EngineCallbacks>;
};

function getRequestItemId(segment: Segment) {
return segment.localId;
}

export class RequestContainer {
private readonly requests = new Map<string, Request>();

addLoaderRequest(segment: Segment, loaderRequest: HybridLoaderRequest) {
const segmentId = segment.localId;
const segmentId = getRequestItemId(segment);
const existingRequest = this.requests.get(segmentId);
if (existingRequest) {
existingRequest.loaderRequest = loaderRequest;
Expand All @@ -41,13 +45,13 @@ export class RequestContainer {
loaderRequest,
});
}
loaderRequest.promise.finally(() =>
loaderRequest.promise.then(() =>
this.clearRequestItem(segmentId, "loader")
);
}

addEngineCallbacks(segment: Segment, engineCallbacks: EngineCallbacks) {
const segmentId = segment.localId;
const segmentId = getRequestItemId(segment);
const requestItem = this.requests.get(segmentId);
if (requestItem) {
requestItem.engineCallbacks = engineCallbacks;
Expand Down Expand Up @@ -125,7 +129,8 @@ export class RequestContainer {
if (type === "engine") delete requestItem.engineCallbacks;
if (type === "loader") delete requestItem.loaderRequest;
if (!requestItem.engineCallbacks && !requestItem.loaderRequest) {
this.requests.delete(requestItem.segment.localId);
const segmentId = getRequestItemId(requestItem.segment);
this.requests.delete(segmentId);
}
}

Expand All @@ -136,7 +141,8 @@ export class RequestContainer {
segment,
} of this.requests.values()) {
if (!engineCallbacks) continue;
if (!isLocked(segment.localId) && loaderRequest) loaderRequest.abort();
const segmentId = getRequestItemId(segment);
if (!isLocked(segmentId) && loaderRequest) loaderRequest.abort();
}
}

Expand Down
37 changes: 24 additions & 13 deletions packages/p2p-media-loader-core/src/segments-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export class SegmentsMemoryStorage {
string,
{ segment: Segment; data: ArrayBuffer; lastAccessed: number }
>();
private readonly cachedSegmentIds = new Set<string>();
private isSegmentLockedPredicate?: (segment: Segment) => boolean;
private onUpdateSubscriptions: (() => void)[] = [];

Expand All @@ -15,6 +16,10 @@ export class SegmentsMemoryStorage {
}
) {}

async initialize(masterManifestUrl: string) {
// empty
}

setIsSegmentLockedPredicate(predicate: (segment: Segment) => boolean) {
this.isSegmentLockedPredicate = predicate;
}
Expand All @@ -24,28 +29,28 @@ export class SegmentsMemoryStorage {
}

async storeSegment(segment: Segment, data: ArrayBuffer) {
this.cache.set(segment.localId, {
const id = segment.externalId;
this.cache.set(id, {
segment,
data,
lastAccessed: performance.now(),
});
this.cachedSegmentIds.add(id);
this.onUpdateSubscriptions.forEach((c) => c());
}

async getSegmentData(segmentId: string): Promise<ArrayBuffer | undefined> {
const cacheItem = this.cache.get(segmentId);
async getSegmentData(
segmentExternalId: string
): Promise<ArrayBuffer | undefined> {
const cacheItem = this.cache.get(segmentExternalId);
if (cacheItem === undefined) return undefined;

cacheItem.lastAccessed = performance.now();
return cacheItem.data;
}

async getStoredSegmentIds() {
const segmentIds = new Set<string>();
for (const segmentId of this.cache.keys()) {
segmentIds.add(segmentId);
}
return segmentIds;
hasSegment(segmentExternalId: string): boolean {
return this.cachedSegmentIds.has(segmentExternalId);
}

async clear(): Promise<boolean> {
Expand All @@ -58,10 +63,13 @@ export class SegmentsMemoryStorage {
// Delete old segments
const now = performance.now();

for (const [segmentId, { lastAccessed, segment }] of this.cache.entries()) {
for (const [
segmentExternalId,
{ lastAccessed, segment },
] of this.cache.entries()) {
if (now - lastAccessed > this.settings.cachedSegmentExpiration) {
if (!this.isSegmentLockedPredicate?.(segment)) {
segmentsToDelete.push(segmentId);
segmentsToDelete.push(segmentExternalId);
}
} else {
remainingSegments.push({ segment, lastAccessed });
Expand All @@ -76,14 +84,17 @@ export class SegmentsMemoryStorage {

for (const cachedSegment of remainingSegments) {
if (!this.isSegmentLockedPredicate?.(cachedSegment.segment)) {
segmentsToDelete.push(cachedSegment.segment.localId);
segmentsToDelete.push(cachedSegment.segment.externalId);
countOverhead--;
if (countOverhead === 0) break;
}
}
}

segmentsToDelete.forEach((id) => this.cache.delete(id));
segmentsToDelete.forEach((id) => {
this.cache.delete(id);
this.cachedSegmentIds.delete(id);
});
if (segmentsToDelete.length) {
this.onUpdateSubscriptions.forEach((c) => c());
}
Expand Down
2 changes: 1 addition & 1 deletion packages/p2p-media-loader-core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export type ByteRange = { start: number; end: number };

export type Segment = {
readonly localId: string;
readonly externalId: number;
readonly externalId: string;
readonly url: string;
readonly byteRange?: ByteRange;
readonly startTime: number;
Expand Down
4 changes: 2 additions & 2 deletions packages/p2p-media-loader-core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export function generateQueue({
stream: Readonly<StreamWithSegments>;
segment: Readonly<Segment>;
playback: Readonly<Playback>;
isSegmentLoaded: (segmentId: string) => boolean;
isSegmentLoaded: (segmentExternalId: string) => boolean;
settings: Pick<
Settings,
"highDemandTimeWindow" | "httpDownloadTimeWindow" | "p2pDownloadTimeWindow"
Expand All @@ -64,7 +64,7 @@ export function generateQueue({
for (const segment of stream.segments.values(requestedSegmentId)) {
const statuses = getSegmentLoadStatuses(segment, bufferRanges);
if (!statuses && !(i === 0 && isNextSegmentHighDemand)) break;
if (isSegmentLoaded(segment.localId)) continue;
if (isSegmentLoaded(segment.externalId)) continue;

queueSegmentIds.add(segment.localId);
statuses.isHighDemand = true;
Expand Down

0 comments on commit 2bb356e

Please sign in to comment.