Skip to content

Commit

Permalink
Refactor http loader. Throw error and clear already loaded data on by…
Browse files Browse the repository at this point in the history
…tes mismatch.
  • Loading branch information
i-zolotarenko committed Dec 29, 2023
1 parent c3ea983 commit e65b6e5
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 150 deletions.
4 changes: 2 additions & 2 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ export class Core<TStream extends Stream = Stream> {
p2pNotReceivingBytesTimeoutMs: 1000,
p2pLoaderDestroyTimeoutMs: 30 * 1000,
httpNotReceivingBytesTimeoutMs: 1000,
maxHttpFailedDownloadAttempts: 3,
maxPeerNotReceivingBytesTimeoutErrors: 3,
httpErrorRetries: 3,
p2pErrorRetries: 3,
};
private readonly bandwidthApproximator = new BandwidthApproximator();
private segmentStorage?: SegmentsMemoryStorage;
Expand Down
211 changes: 121 additions & 90 deletions packages/p2p-media-loader-core/src/http-loader.ts
Original file line number Diff line number Diff line change
@@ -1,70 +1,139 @@
import { Settings } from "./types";
import { Request, RequestError, HttpRequestErrorType } from "./request";

export async function fulfillHttpSegmentRequest(
request: Request,
settings: Pick<Settings, "httpNotReceivingBytesTimeoutMs">
) {
const requestHeaders = new Headers();
const { segment, loadedBytes: alreadyLoadedBytes } = request;
const { url, byteRange } = segment;

let byteFrom = byteRange?.start;
const byteTo = byteRange?.end;
if (alreadyLoadedBytes !== 0) byteFrom = (byteFrom ?? 0) + alreadyLoadedBytes;

if (byteFrom !== undefined) {
const byteRangeString = `bytes=${byteFrom}-${byteTo ?? ""}`;
requestHeaders.set("Range", byteRangeString);
}
import {
Request,
RequestError,
HttpRequestErrorType,
RequestControls,
} from "./request";

type HttpSettings = Pick<Settings, "httpNotReceivingBytesTimeoutMs">;

const abortController = new AbortController();
const requestControls = request.start(
{ type: "http" },
{
abort: () => abortController.abort("abort"),
notReceivingBytesTimeoutMs: settings.httpNotReceivingBytesTimeoutMs,
export class HttpRequestExecutor {
private readonly requestControls: RequestControls;
private readonly requestHeaders = new Headers();
private readonly abortController = new AbortController();
private readonly expectedBytesLength?: number;
private readonly byteRange?: { start: number; end?: number };

constructor(
private readonly request: Request,
private readonly settings: HttpSettings
) {
const { byteRange } = this.request.segment;
if (byteRange) this.byteRange = { ...byteRange };

if (request.loadedBytes !== 0) {
this.byteRange = this.byteRange ?? { start: 0 };
this.byteRange.start = this.byteRange.start + request.loadedBytes;
}
);
try {
const fetchResponse = await window.fetch(url, {
headers: requestHeaders,
signal: abortController.signal,
});
if (!fetchResponse.ok) {
throw new RequestError("fetch-error", fetchResponse.statusText);
if (this.request.totalBytes) {
this.expectedBytesLength =
this.request.totalBytes - this.request.loadedBytes;
}
if (!fetchResponse.body) return;
requestControls.firstBytesReceived();

if (
byteFrom !== undefined &&
(fetchResponse.status !== 206 ||
!isResponseWithRequestedContentRange(fetchResponse, byteFrom, byteTo))
) {
request.clearLoadedBytes();

if (this.byteRange) {
const { start, end } = this.byteRange;
this.requestHeaders.set("Range", `bytes=${start}-${end ?? ""}`);
}

if (request.totalBytes === undefined) {
const totalBytesString = fetchResponse.headers.get("Content-Length");
if (totalBytesString) request.setTotalBytes(+totalBytesString);
const { httpNotReceivingBytesTimeoutMs } = this.settings;
this.requestControls = this.request.start(
{ type: "http" },
{
abort: () => this.abortController.abort("abort"),
notReceivingBytesTimeoutMs: httpNotReceivingBytesTimeoutMs,
}
);
void this.fetch();
}

private async fetch() {
const { segment } = this.request;
try {
const response = await window.fetch(segment.url, {
headers: this.requestHeaders,
signal: this.abortController.signal,
});
this.handleResponseHeaders(response);

if (!response.body) return;
const { requestControls } = this;
requestControls.firstBytesReceived();

const reader = response.body.getReader();
for await (const chunk of readStream(reader)) {
this.requestControls.addLoadedChunk(chunk);
}
requestControls.completeOnSuccess();
} catch (error) {
this.handleError(error);
}
}

const reader = fetchResponse.body.getReader();
for await (const chunk of readStream(reader)) {
requestControls.addLoadedChunk(chunk);
private handleResponseHeaders(response: Response) {
if (!response.ok) {
if (response.status === 406) {
this.request.clearLoadedBytes();
throw new RequestError("http-bytes-mismatch", response.statusText);
} else {
throw new RequestError("http-error", response.statusText);
}
}
requestControls.completeOnSuccess();
} catch (error) {

const { byteRange } = this;
if (byteRange) {
if (response.status === 200) {
this.request.clearLoadedBytes();
} else {
if (response.status !== 206) {
this.request.clearLoadedBytes();
throw new RequestError("http-bytes-mismatch", response.statusText);
}
const contentLengthHeader = response.headers.get("Content-Length");
if (
contentLengthHeader &&
this.expectedBytesLength !== +contentLengthHeader
) {
this.request.clearLoadedBytes();
throw new RequestError("http-bytes-mismatch", response.statusText);
}

const contentRangeHeader = response.headers.get("Content-Range");
const contentRange = contentRangeHeader
? parseContentRangeHeader(contentRangeHeader)
: undefined;
if (contentRange) {
const { from, to, total } = contentRange;
if (
(total !== undefined && this.request.totalBytes !== total) ||
(from !== undefined && byteRange.start !== from) ||
(to !== undefined &&
byteRange.end !== undefined &&
byteRange.end !== to)
) {
this.request.clearLoadedBytes();
throw new RequestError("http-bytes-mismatch", response.statusText);
}
}
}
}

if (response.status === 200 && this.request.totalBytes === undefined) {
const contentLengthHeader = response.headers.get("Content-Length");
if (contentLengthHeader) this.request.setTotalBytes(+contentLengthHeader);
}
}

private handleError(error: unknown) {
if (error instanceof Error) {
if (error.name !== "abort") return;

const httpLoaderError: RequestError<HttpRequestErrorType> = !(
error instanceof RequestError
)
? new RequestError("fetch-error", error.message)
? new RequestError("http-error", error.message)
: error;
requestControls.abortOnError(httpLoaderError);
this.requestControls.abortOnError(httpLoaderError);
}
}
}
Expand All @@ -79,7 +148,7 @@ async function* readStream(
}
}

function getValueFromContentRangeHeader(headerValue: string) {
function parseContentRangeHeader(headerValue: string) {
const match = headerValue
.trim()
.match(/^bytes (?:(?:(\d+)|)-(?:(\d+)|)|\*)\/(?:(\d+)|\*)$/);
Expand All @@ -92,41 +161,3 @@ function getValueFromContentRangeHeader(headerValue: string) {
total: total ? parseInt(total) : undefined,
};
}

function isResponseWithRequestedContentRange(
response: Response,
requestedFromByte: number,
requestedToByte?: number
): boolean {
const requestedBytesAmount =
requestedToByte !== undefined
? requestedToByte - requestedFromByte + 1
: undefined;

const { headers } = response;
const contentLengthHeader = headers.get("Content-Length");
const contentLength = contentLengthHeader && parseInt(contentLengthHeader);

if (
contentLength &&
requestedBytesAmount !== undefined &&
requestedBytesAmount !== contentLength
) {
return false;
}

const contentRangeHeader = headers.get("Content-Range");
const contentRange =
contentRangeHeader && getValueFromContentRangeHeader(contentRangeHeader);
if (!contentRange) return true;
const { from, to } = contentRange;
if (from !== requestedFromByte) return false;
if (
to !== undefined &&
requestedToByte !== undefined &&
to !== requestedToByte
) {
return false;
}
return true;
}
21 changes: 10 additions & 11 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Segment, StreamWithSegments } from "./index";
import { fulfillHttpSegmentRequest } from "./http-loader";
import { HttpRequestExecutor } from "./http-loader";
import { SegmentsMemoryStorage } from "./segments-storage";
import { Settings, CoreEventHandlers, Playback } from "./types";
import { BandwidthApproximator } from "./bandwidth-approximator";
Expand Down Expand Up @@ -126,13 +126,13 @@ export class HybridLoader {

private processRequests(queueSegmentIds: Set<string>) {
const { stream } = this.lastRequestedSegment;
const { maxHttpFailedDownloadAttempts } = this.settings;
const { httpErrorRetries } = this.settings;
for (const request of this.requests.items()) {
const {
type,
status,
segment,
isCheckedByProcessQueue,
isHandledByProcessQueue,
isSegmentRequestedByEngine,
} = request;

Expand Down Expand Up @@ -161,7 +161,7 @@ export class HybridLoader {
break;

case "failed":
if (type === "http" && !isCheckedByProcessQueue) {
if (type === "http" && !isHandledByProcessQueue) {
this.p2pLoaders.currentLoader.broadcastAnnouncement();
}
if (
Expand All @@ -171,8 +171,7 @@ export class HybridLoader {
this.requests.remove(request);
}
if (
request.failedAttempts.httpAttemptsCount >=
maxHttpFailedDownloadAttempts &&
request.failedAttempts.httpAttemptsCount >= httpErrorRetries &&
isSegmentRequestedByEngine
) {
request.resolveEngineCallbacksWithError();
Expand All @@ -187,11 +186,12 @@ export class HybridLoader {
this.requests.remove(request);
break;
}
request.markCheckedByProcessQueue();
request.markHandledByProcessQueue();
}
}

private processQueue() {
console.log("process queue");
const { queue, queueSegmentIds } = QueueUtils.generateQueue({
lastRequestedSegment: this.lastRequestedSegment,
playback: this.playback,
Expand All @@ -208,7 +208,7 @@ export class HybridLoader {
const {
simultaneousHttpDownloads,
simultaneousP2PDownloads,
maxHttpFailedDownloadAttempts,
httpErrorRetries,
} = this.settings;

for (const item of queue) {
Expand All @@ -220,8 +220,7 @@ export class HybridLoader {
if (
request?.type === "http" &&
request.status === "failed" &&
request.failedAttempts.httpAttemptsCount >=
maxHttpFailedDownloadAttempts
request.failedAttempts.httpAttemptsCount >= httpErrorRetries
) {
break;
}
Expand Down Expand Up @@ -287,7 +286,7 @@ export class HybridLoader {

private async loadThroughHttp(segment: Segment) {
const request = this.requests.getOrCreateRequest(segment);
void fulfillHttpSegmentRequest(request, this.settings);
new HttpRequestExecutor(request, this.settings);
this.p2pLoaders.currentLoader.broadcastAnnouncement();
}

Expand Down
4 changes: 1 addition & 3 deletions packages/p2p-media-loader-core/src/p2p/peer-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import { Settings } from "../types";

export type PeerSettings = Pick<
Settings,
| "p2pNotReceivingBytesTimeoutMs"
| "webRtcMaxMessageSize"
| "maxPeerNotReceivingBytesTimeoutErrors"
"p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize" | "p2pErrorRetries"
>;

export class PeerProtocol {
Expand Down
Loading

0 comments on commit e65b6e5

Please sign in to comment.