Skip to content

Commit

Permalink
Add bandwidth calculator.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Dec 29, 2023
1 parent b119d6f commit 41f6292
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 86 deletions.
113 changes: 52 additions & 61 deletions packages/p2p-media-loader-core/src/bandwidth-calculator.ts
Original file line number Diff line number Diff line change
@@ -1,83 +1,74 @@
import { LoadProgress } from "./request";
import { arrayBackwards } from "./utils/utils";

export class BandwidthCalculator {
private readonly loadings: LoadProgress[] = [];

addLoading(progress: LoadProgress) {
this.clearStale();
this.loadings.push(progress);
}

// in bits per second
getBandwidth(): number {
this.clearStale();
return getBandwidthByProgressList(this.loadings);
}

private clearStale() {
const now = performance.now();
for (const { startTimestamp } of this.loadings) {
if (now - startTimestamp <= 15000) break;
this.loadings.shift();
}
}

destroy() {
this.loadings.length = 0;
}
}

function getBandwidthByProgressList(loadings: LoadProgress[]) {
if (!loadings.length) return 0;
let margin: number | undefined;
let totalLoadingTime = 0;
let totalBytes = 0;
const now = performance.now();

for (const {
startTimestamp: from,
lastLoadedChunkTimestamp: to = now,
loadedBytes,
} of loadings) {
totalBytes += loadedBytes;

if (margin === undefined || from > margin) {
margin = to;
totalLoadingTime += to - from;
continue;
}

if (from <= margin && to > margin) {
totalLoadingTime += to - margin;
margin = to;
}
}

return (totalBytes * 8000) / totalLoadingTime;
}

class BandwidthCalculator1 {
private simultaneousLoadingsCount = 0;
private readonly bytes: number[] = [];
private readonly timestamps: number[] = [];
private loadingIntervals: { start: number; end?: number }[] = [];

addBytes(bytesLength: number) {
this.bytes.push(bytesLength);
this.timestamps.push(performance.now());
}

startLoading() {
this.clearStale();
if (this.simultaneousLoadingsCount === 0) {
this.loadingIntervals.push({ start: performance.now() });
}
this.simultaneousLoadingsCount++;
}

stopLoading() {
if (this.simultaneousLoadingsCount === 0) return;
this.clearStale();
if (this.simultaneousLoadingsCount <= 0) return;
this.simultaneousLoadingsCount--;
if (this.simultaneousLoadingsCount !== 0) return;
this.loadingIntervals[this.loadingIntervals.length - 1].end =
performance.now();
}

private clearStale() {
const length = this.bytes.length;
// in bits per second
getBandwidthForLastNSeconds(seconds: number) {
this.clearStale();
const { bytes, timestamps, loadingIntervals } = this;
const samplesLength = bytes.length;
const now = performance.now();
const threshold = now - seconds * 1000;

let loadedBytes = 0;
for (let i = samplesLength - 1; i >= 0; i--) {
if (timestamps[i] < threshold) break;
loadedBytes += bytes[i];
}

let clearLoadingTime = 0;
for (const { start, end } of arrayBackwards(loadingIntervals)) {
if (start < threshold && end !== undefined && end < threshold) break;
const from = Math.max(start, threshold);
const to = end ?? now;
clearLoadingTime += to - from;
}

if (clearLoadingTime === 0) return 0;
return (loadedBytes * 8000) / clearLoadingTime;
}

for (let i = 0; i < length; i++) {}
private clearStale() {
const { timestamps, bytes, loadingIntervals } = this;
const samplesLength = bytes.length;
const threshold = performance.now() - 15000;

let count = 0;
while (count < samplesLength && timestamps[count] < threshold) count++;
bytes.splice(0, count);
timestamps.splice(0, count);

count = 0;
for (const { start, end } of loadingIntervals) {
if (!(start < threshold && end !== undefined && end <= threshold)) break;
count++;
}
loadingIntervals.splice(0, count);
}
}
5 changes: 2 additions & 3 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class Core<TStream extends Stream = Stream> {
httpErrorRetries: 3,
p2pErrorRetries: 3,
};
private readonly bandwidthApproximator = new BandwidthCalculator();
private readonly bandwidthCalculator = new BandwidthCalculator();
private segmentStorage?: SegmentsMemoryStorage;
private mainStreamLoader?: HybridLoader;
private secondaryStreamLoader?: HybridLoader;
Expand Down Expand Up @@ -113,7 +113,6 @@ export class Core<TStream extends Stream = Stream> {
this.mainStreamLoader = undefined;
this.secondaryStreamLoader = undefined;
this.segmentStorage = undefined;
this.bandwidthApproximator.destroy();
this.manifestResponseUrl = undefined;
}

Expand Down Expand Up @@ -145,7 +144,7 @@ export class Core<TStream extends Stream = Stream> {
manifestResponseUrl,
segment,
this.settings,
this.bandwidthApproximator,
this.bandwidthCalculator,
this.segmentStorage,
this.eventHandlers
);
Expand Down
16 changes: 5 additions & 11 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class HybridLoader {
private streamManifestUrl: string,
requestedSegment: Segment,
private readonly settings: Settings,
private readonly bandwidthApproximator: BandwidthCalculator,
private readonly bandwidthCalculator: BandwidthCalculator,
private readonly segmentStorage: SegmentsMemoryStorage,
private readonly eventHandlers?: Pick<CoreEventHandlers, "onSegmentLoaded">
) {
Expand All @@ -40,7 +40,7 @@ export class HybridLoader {
this.segmentAvgDuration = StreamUtils.getSegmentAvgDuration(activeStream);
this.requests = new RequestsContainer(
this.requestProcessQueueMicrotask,
this.bandwidthApproximator,
this.bandwidthCalculator,
this.playback,
this.settings
);
Expand Down Expand Up @@ -94,7 +94,7 @@ export class HybridLoader {
if (data) {
callbacks.onSuccess({
data,
bandwidth: this.bandwidthApproximator.getBandwidth(),
bandwidth: this.bandwidthCalculator.getBandwidthForLastNSeconds(3),
});
}
} else {
Expand Down Expand Up @@ -344,7 +344,7 @@ export class HybridLoader {
queue: QueueUtils.QueueItem[],
segment: Segment
): boolean {
for (const { segment: itemSegment } of arrayBackwards(queue)) {
for (const { segment: itemSegment } of Utils.arrayBackwards(queue)) {
if (itemSegment === segment) break;
const request = this.requests.get(itemSegment);
if (request?.type === "http" && request.status === "loading") {
Expand All @@ -359,7 +359,7 @@ export class HybridLoader {
queue: QueueUtils.QueueItem[],
segment: Segment
): boolean {
for (const { segment: itemSegment } of arrayBackwards(queue)) {
for (const { segment: itemSegment } of Utils.arrayBackwards(queue)) {
if (itemSegment === segment) break;
const request = this.requests.get(itemSegment);
if (request?.type === "p2p" && request.status === "loading") {
Expand Down Expand Up @@ -403,9 +403,3 @@ export class HybridLoader {
this.logger.destroy();
}
}

function* arrayBackwards<T>(arr: T[]) {
for (let i = arr.length - 1; i >= 0; i--) {
yield arr[i];
}
}
5 changes: 3 additions & 2 deletions packages/p2p-media-loader-core/src/p2p/tracker-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ export class P2PTrackerClient {
private readonly settings: Settings
) {
const { string: peerId, bytes: peerIdBytes } = PeerUtil.generatePeerId();
const { bytes: streamIdBytes, string: streamHash } =
PeerUtil.getStreamHash(streamId);
const { bytes: streamIdBytes, string: streamHash } = PeerUtil.getStreamHash(
streamId + "a"
);
this.peerId = peerId;
this.streamShortId = LoggerUtils.getStreamString(stream);

Expand Down
4 changes: 2 additions & 2 deletions packages/p2p-media-loader-core/src/request-container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export class RequestsContainer {

constructor(
private readonly requestProcessQueueCallback: () => void,
private readonly bandwidthApproximator: BandwidthCalculator,
private readonly bandwidthCalculator: BandwidthCalculator,
private readonly playback: Playback,
private readonly settings: Settings
) {}
Expand Down Expand Up @@ -44,7 +44,7 @@ export class RequestsContainer {
request = new Request(
segment,
this.requestProcessQueueCallback,
this.bandwidthApproximator,
this.bandwidthCalculator,
this.playback,
this.settings
);
Expand Down
19 changes: 12 additions & 7 deletions packages/p2p-media-loader-core/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export class Request {
constructor(
readonly segment: Segment,
private readonly requestProcessQueueCallback: () => void,
private readonly bandwidthApproximator: BandwidthCalculator,
private readonly bandwidthCalculator: BandwidthCalculator,
private readonly playback: Playback,
private readonly settings: StreamUtils.PlaybackTimeWindowsSettings
) {
Expand Down Expand Up @@ -179,7 +179,7 @@ export class Request {
loadedBytes: 0,
startTimestamp: performance.now(),
};
this.bandwidthApproximator.addLoading(this.progress);
this.bandwidthCalculator.startLoading();
const { notReceivingBytesTimeoutMs, abort } = controls;
this._abortRequestCallback = abort;

Expand Down Expand Up @@ -207,10 +207,11 @@ export class Request {

resolveEngineCallbacksSuccessfully() {
if (!this.finalData) return;
this._engineCallbacks?.onSuccess({
data: this.finalData,
bandwidth: this.bandwidthApproximator.getBandwidth(),
});
const bandwidth = this.bandwidthCalculator.getBandwidthForLastNSeconds(3);
const bandwidth6 = this.bandwidthCalculator.getBandwidthForLastNSeconds(6);
console.log("bandwidth", bandwidth / 1000);
console.log("bandwidth6", bandwidth6 / 1000);
this._engineCallbacks?.onSuccess({ data: this.finalData, bandwidth });
this._engineCallbacks = undefined;
}

Expand All @@ -236,6 +237,7 @@ export class Request {
this._abortRequestCallback = undefined;
this.currentAttempt = undefined;
this.notReceivingBytesTimeout.clear();
this.bandwidthCalculator.stopLoading();
}

private abortOnTimeout = () => {
Expand All @@ -252,6 +254,7 @@ export class Request {
error,
});
this.notReceivingBytesTimeout.clear();
this.bandwidthCalculator.stopLoading();
this.requestProcessQueueCallback();
};

Expand All @@ -266,19 +269,20 @@ export class Request {
error,
});
this.notReceivingBytesTimeout.clear();
this.bandwidthCalculator.stopLoading();
this.requestProcessQueueCallback();
};

private completeOnSuccess = () => {
this.throwErrorIfNotLoadingStatus();
if (!this.currentAttempt) return;

this.bandwidthCalculator.stopLoading();
this.notReceivingBytesTimeout.clear();
this.finalData = Utils.joinChunks(this.bytes);
this.setStatus("succeed");
this._totalBytes = this._loadedBytes;

this.resolveEngineCallbacksSuccessfully();
this.logger(
`${this.currentAttempt.type} ${this.segment.externalId} succeed`
);
Expand All @@ -290,6 +294,7 @@ export class Request {
if (!this.currentAttempt || !this.progress) return;
this.notReceivingBytesTimeout.restart();

this.bandwidthCalculator.addBytes(chunk.length);
this.bytes.push(chunk);
this.progress.lastLoadedChunkTimestamp = performance.now();
this.progress.loadedBytes += chunk.length;
Expand Down
6 changes: 6 additions & 0 deletions packages/p2p-media-loader-core/src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ export function hexToUtf8(hexString: string) {
const decoder = new TextDecoder();
return decoder.decode(bytes);
}

export function* arrayBackwards<T>(arr: T[]) {
for (let i = arr.length - 1; i >= 0; i--) {
yield arr[i];
}
}

0 comments on commit 41f6292

Please sign in to comment.