diff --git a/p2p-media-loader-demo/index.html b/p2p-media-loader-demo/index.html
index 60d8cc32..cc9c4afa 100644
--- a/p2p-media-loader-demo/index.html
+++ b/p2p-media-loader-demo/index.html
@@ -1,17 +1,17 @@
-
-
- Vite + React + TS
-
-
-
-
-
+
+
+ Vite + React + TS
+
+
+
+
+
diff --git a/p2p-media-loader-demo/src/App.tsx b/p2p-media-loader-demo/src/App.tsx
index e367376a..e3c6b214 100644
--- a/p2p-media-loader-demo/src/App.tsx
+++ b/p2p-media-loader-demo/src/App.tsx
@@ -22,6 +22,8 @@ const streamUrls = {
hlsBigBunnyBuck: "https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8",
hlsByteRangeVideo:
"https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/bipbop_16x9_variant.m3u8",
+ hlsOneLevelByteRangeVideo:
+ "https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/gear1/prog_index.m3u8",
hlsBasicExample:
"https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_4x3/bipbop_4x3_variant.m3u8",
hlsAdvancedVideo:
diff --git a/packages/p2p-media-loader-core/src/core.ts b/packages/p2p-media-loader-core/src/core.ts
index 3aab237c..39daa6cc 100644
--- a/packages/p2p-media-loader-core/src/core.ts
+++ b/packages/p2p-media-loader-core/src/core.ts
@@ -28,6 +28,8 @@ export class Core {
p2pNotReceivingBytesTimeoutMs: 1000,
p2pLoaderDestroyTimeoutMs: 30 * 1000,
httpNotReceivingBytesTimeoutMs: 1000,
+ httpErrorRetries: 3,
+ p2pErrorRetries: 3,
};
private readonly bandwidthApproximator = new BandwidthApproximator();
private segmentStorage?: SegmentsMemoryStorage;
diff --git a/packages/p2p-media-loader-core/src/http-loader.ts b/packages/p2p-media-loader-core/src/http-loader.ts
index d81d9d3b..7beac7a3 100644
--- a/packages/p2p-media-loader-core/src/http-loader.ts
+++ b/packages/p2p-media-loader-core/src/http-loader.ts
@@ -1,58 +1,146 @@
import { Settings } from "./types";
-import { Request, RequestError, HttpRequestErrorType } from "./request";
-
-export async function fulfillHttpSegmentRequest(
- request: Request,
- settings: Pick
-) {
- const headers = new Headers();
- const { segment } = request;
- const { url, byteRange } = segment;
-
- if (byteRange) {
- const { start, end } = byteRange;
- const byteRangeString = `bytes=${start}-${end}`;
- headers.set("Range", byteRangeString);
+import {
+ Request,
+ RequestError,
+ HttpRequestErrorType,
+ RequestControls,
+} from "./request";
+
+type HttpSettings = Pick;
+
+export class HttpRequestExecutor {
+ private readonly requestControls: RequestControls;
+ private readonly requestHeaders = new Headers();
+ private readonly abortController = new AbortController();
+ private readonly expectedBytesLength?: number;
+ private readonly byteRange?: { start: number; end?: number };
+
+ constructor(
+ private readonly request: Request,
+ private readonly settings: HttpSettings
+ ) {
+ const { byteRange } = this.request.segment;
+ if (byteRange) this.byteRange = { ...byteRange };
+
+ if (request.loadedBytes !== 0) {
+ this.byteRange = this.byteRange ?? { start: 0 };
+ this.byteRange.start = this.byteRange.start + request.loadedBytes;
+ }
+ if (this.request.totalBytes) {
+ this.expectedBytesLength =
+ this.request.totalBytes - this.request.loadedBytes;
+ }
+
+ if (this.byteRange) {
+ const { start, end } = this.byteRange;
+ this.requestHeaders.set("Range", `bytes=${start}-${end ?? ""}`);
+ }
+
+ const { httpNotReceivingBytesTimeoutMs } = this.settings;
+ this.requestControls = this.request.start(
+ { type: "http" },
+ {
+ abort: () => this.abortController.abort("abort"),
+ notReceivingBytesTimeoutMs: httpNotReceivingBytesTimeoutMs,
+ }
+ );
+ void this.fetch();
}
- const abortController = new AbortController();
- const requestControls = request.start(
- { type: "http" },
- {
- abort: () => abortController.abort("abort"),
- notReceivingBytesTimeoutMs: settings.httpNotReceivingBytesTimeoutMs,
+ private async fetch() {
+ const { segment } = this.request;
+ try {
+ const response = await window.fetch(segment.url, {
+ headers: this.requestHeaders,
+ signal: this.abortController.signal,
+ });
+ this.handleResponseHeaders(response);
+
+ if (!response.body) return;
+ const { requestControls } = this;
+ requestControls.firstBytesReceived();
+
+ const reader = response.body.getReader();
+ for await (const chunk of readStream(reader)) {
+ this.requestControls.addLoadedChunk(chunk);
+ }
+ requestControls.completeOnSuccess();
+ } catch (error) {
+ this.handleError(error);
}
- );
- try {
- const fetchResponse = await window.fetch(url, {
- headers,
- signal: abortController.signal,
- });
- requestControls.firstBytesReceived();
-
- if (!fetchResponse.ok) {
- throw new RequestError("fetch-error", fetchResponse.statusText);
+ }
+
+ private handleResponseHeaders(response: Response) {
+ if (!response.ok) {
+ if (response.status === 406) {
+ this.request.clearLoadedBytes();
+ throw new RequestError("http-bytes-mismatch", response.statusText);
+ } else {
+ throw new RequestError("http-error", response.statusText);
+ }
}
- if (!fetchResponse.body) return;
- const totalBytesString = fetchResponse.headers.get("Content-Length");
- if (totalBytesString) request.setTotalBytes(+totalBytesString);
+ const { byteRange } = this;
+ if (byteRange) {
+ if (response.status === 200) {
+ if (this.request.segment.byteRange) {
+ throw new RequestError("http-unexpected-status-code");
+ } else {
+ this.request.clearLoadedBytes();
+ }
+ } else {
+ if (response.status !== 206) {
+ throw new RequestError(
+ "http-unexpected-status-code",
+ response.statusText
+ );
+ }
+ const contentLengthHeader = response.headers.get("Content-Length");
+ if (
+ contentLengthHeader &&
+ this.expectedBytesLength !== undefined &&
+ this.expectedBytesLength !== +contentLengthHeader
+ ) {
+ this.request.clearLoadedBytes();
+ throw new RequestError("http-bytes-mismatch", response.statusText);
+ }
+
+ const contentRangeHeader = response.headers.get("Content-Range");
+ const contentRange = contentRangeHeader
+ ? parseContentRangeHeader(contentRangeHeader)
+ : undefined;
+ if (contentRange) {
+ const { from, to, total } = contentRange;
+ if (
+ (total !== undefined && this.request.totalBytes !== total) ||
+ (from !== undefined && byteRange.start !== from) ||
+ (to !== undefined &&
+ byteRange.end !== undefined &&
+ byteRange.end !== to)
+ ) {
+ this.request.clearLoadedBytes();
+ throw new RequestError("http-bytes-mismatch", response.statusText);
+ }
+ }
+ }
+ }
- const reader = fetchResponse.body.getReader();
- for await (const chunk of readStream(reader)) {
- requestControls.addLoadedChunk(chunk);
+ if (response.status === 200 && this.request.totalBytes === undefined) {
+ const contentLengthHeader = response.headers.get("Content-Length");
+ if (contentLengthHeader) this.request.setTotalBytes(+contentLengthHeader);
}
- requestControls.completeOnSuccess();
- } catch (error) {
+ }
+
+ private handleError(error: unknown) {
if (error instanceof Error) {
if (error.name !== "abort") return;
const httpLoaderError: RequestError = !(
error instanceof RequestError
)
- ? new RequestError("fetch-error", error.message)
+ ? new RequestError("http-error", error.message)
: error;
- requestControls.abortOnError(httpLoaderError);
+ this.requestControls.abortOnError(httpLoaderError);
}
}
}
@@ -66,3 +154,17 @@ async function* readStream(
yield value;
}
}
+
+function parseContentRangeHeader(headerValue: string) {
+ const match = headerValue
+ .trim()
+ .match(/^bytes (?:(?:(\d+)|)-(?:(\d+)|)|\*)\/(?:(\d+)|\*)$/);
+ if (!match) return;
+
+ const [, from, to, total] = match;
+ return {
+ from: from ? parseInt(from) : undefined,
+ to: to ? parseInt(to) : undefined,
+ total: total ? parseInt(total) : undefined,
+ };
+}
diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts
index a49d2248..e008baf0 100644
--- a/packages/p2p-media-loader-core/src/hybrid-loader.ts
+++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts
@@ -1,5 +1,5 @@
import { Segment, StreamWithSegments } from "./index";
-import { fulfillHttpSegmentRequest } from "./http-loader";
+import { HttpRequestExecutor } from "./http-loader";
import { SegmentsMemoryStorage } from "./segments-storage";
import { Settings, CoreEventHandlers, Playback } from "./types";
import { BandwidthApproximator } from "./bandwidth-approximator";
@@ -12,6 +12,8 @@ import * as StreamUtils from "./utils/stream";
import * as Utils from "./utils/utils";
import debug from "debug";
+const FAILED_ATTEMPTS_CLEAR_INTERVAL = 60000;
+
export class HybridLoader {
private readonly requests: RequestsContainer;
private readonly p2pLoaders: P2PLoadersContainer;
@@ -97,9 +99,8 @@ export class HybridLoader {
}
} else {
const request = this.requests.getOrCreateRequest(segment);
- request.setOrResolveEngineCallbacks(callbacks);
+ request.setEngineCallbacks(callbacks);
}
-
this.requestProcessQueueMicrotask();
}
@@ -125,6 +126,81 @@ export class HybridLoader {
});
};
+ private processRequests(queueSegmentIds: Set) {
+ const { stream } = this.lastRequestedSegment;
+ const { httpErrorRetries } = this.settings;
+ const now = performance.now();
+ for (const request of this.requests.items()) {
+ const {
+ type,
+ status,
+ segment,
+ isHandledByProcessQueue,
+ isSegmentRequestedByEngine,
+ } = request;
+
+ if (!type) continue;
+
+ switch (status) {
+ case "loading":
+ if (
+ !isSegmentRequestedByEngine &&
+ !queueSegmentIds.has(segment.localId)
+ ) {
+ request.abortFromProcessQueue();
+ this.requests.remove(request);
+ }
+ break;
+
+ case "succeed":
+ if (!request.data) break;
+ if (type === "http") {
+ this.p2pLoaders.currentLoader.broadcastAnnouncement();
+ }
+ request.resolveEngineCallbacksSuccessfully();
+ void this.segmentStorage.storeSegment(request.segment, request.data);
+ this.eventHandlers?.onSegmentLoaded?.(request.data.byteLength, type);
+ this.requests.remove(request);
+ break;
+
+ case "failed":
+ if (type === "http" && !isHandledByProcessQueue) {
+ this.p2pLoaders.currentLoader.broadcastAnnouncement();
+ }
+ if (
+ !isSegmentRequestedByEngine &&
+ !stream.segments.has(request.segment.localId)
+ ) {
+ this.requests.remove(request);
+ }
+ if (
+ request.failedAttempts.httpAttemptsCount >= httpErrorRetries &&
+ isSegmentRequestedByEngine
+ ) {
+ request.resolveEngineCallbacksWithError();
+ }
+ break;
+
+ case "not-started":
+ this.requests.remove(request);
+ break;
+
+ case "aborted":
+ this.requests.remove(request);
+ break;
+ }
+ request.markHandledByProcessQueue();
+
+ const { lastAttempt } = request.failedAttempts;
+ if (
+ lastAttempt &&
+ now - lastAttempt.error.timestamp > FAILED_ATTEMPTS_CLEAR_INTERVAL
+ ) {
+ request.failedAttempts.clear();
+ }
+ }
+ }
+
private processQueue() {
const { queue, queueSegmentIds } = QueueUtils.generateQueue({
lastRequestedSegment: this.lastRequestedSegment,
@@ -137,48 +213,13 @@ export class HybridLoader {
);
},
});
+ this.processRequests(queueSegmentIds);
- for (const request of this.requests.items()) {
- if (request.status === "loading") {
- if (
- !request.isSegmentRequestedByEngine &&
- !queueSegmentIds.has(request.segment.localId)
- ) {
- request.abortFromProcessQueue();
- this.requests.remove(request);
- }
- continue;
- }
-
- if (request.status === "succeed") {
- const { type, data } = request;
- if (!type || !data) continue;
- if (type === "http") {
- this.p2pLoaders.currentLoader.broadcastAnnouncement();
- }
- void this.segmentStorage.storeSegment(request.segment, data);
- this.eventHandlers?.onSegmentLoaded?.(data.byteLength, type);
- this.requests.remove(request);
- continue;
- }
-
- if (request.status === "failed") {
- if (request.type === "http") {
- this.p2pLoaders.currentLoader.broadcastAnnouncement();
- }
- continue;
- }
-
- if (
- (request.status === "not-started" || request.status === "aborted") &&
- !request.isSegmentRequestedByEngine
- ) {
- this.requests.remove(request);
- }
- }
-
- const { simultaneousHttpDownloads, simultaneousP2PDownloads } =
- this.settings;
+ const {
+ simultaneousHttpDownloads,
+ simultaneousP2PDownloads,
+ httpErrorRetries,
+ } = this.settings;
for (const item of queue) {
const { statuses, segment } = item;
@@ -186,6 +227,13 @@ export class HybridLoader {
if (statuses.isHighDemand) {
if (request?.type === "http" && request.status === "loading") continue;
+ if (
+ request?.type === "http" &&
+ request.status === "failed" &&
+ request.failedAttempts.httpAttemptsCount >= httpErrorRetries
+ ) {
+ break;
+ }
const isP2PLoadingRequest =
request?.status === "loading" && request.type === "p2p";
@@ -248,7 +296,7 @@ export class HybridLoader {
private async loadThroughHttp(segment: Segment) {
const request = this.requests.getOrCreateRequest(segment);
- void fulfillHttpSegmentRequest(request, this.settings);
+ new HttpRequestExecutor(request, this.settings);
this.p2pLoaders.currentLoader.broadcastAnnouncement();
}
@@ -257,7 +305,7 @@ export class HybridLoader {
}
private loadRandomThroughHttp() {
- const { simultaneousHttpDownloads } = this.settings;
+ const { simultaneousHttpDownloads, httpErrorRetries } = this.settings;
const p2pLoader = this.p2pLoaders.currentLoader;
const connectedPeersAmount = p2pLoader.connectedPeersAmount;
if (
@@ -270,11 +318,17 @@ export class HybridLoader {
lastRequestedSegment: this.lastRequestedSegment,
playback: this.playback,
settings: this.settings,
- skipSegment: (segment, statuses) =>
- !statuses.isHttpDownloadable ||
- this.segmentStorage.hasSegment(segment) ||
- this.requests.isHybridLoaderRequested(segment) ||
- p2pLoader.isLoadingOrLoadedBySomeone(segment),
+ skipSegment: (segment, statuses) => {
+ const request = this.requests.get(segment);
+ return (
+ !statuses.isHttpDownloadable ||
+ this.segmentStorage.hasSegment(segment) ||
+ request?.type !== undefined ||
+ (request?.failedAttempts.httpAttemptsCount ?? 0) >=
+ httpErrorRetries ||
+ p2pLoader.isLoadingOrLoadedBySomeone(segment)
+ );
+ },
});
if (!queue.length) return;
const peersAmount = connectedPeersAmount + 1;
diff --git a/packages/p2p-media-loader-core/src/p2p/commands/commands.ts b/packages/p2p-media-loader-core/src/p2p/commands/commands.ts
index 30ddeca6..d5cdd6f9 100644
--- a/packages/p2p-media-loader-core/src/p2p/commands/commands.ts
+++ b/packages/p2p-media-loader-core/src/p2p/commands/commands.ts
@@ -61,6 +61,7 @@ export function serializePeerCommand(
switch (command.c) {
case PeerCommandType.CancelSegmentRequest:
case PeerCommandType.SegmentAbsent:
+ case PeerCommandType.SegmentDataSendingCompleted:
return serializePeerSegmentCommand(command, maxChunkSize);
case PeerCommandType.SegmentRequest:
return serializePeerSegmentRequestCommand(command, maxChunkSize);
diff --git a/packages/p2p-media-loader-core/src/p2p/commands/types.ts b/packages/p2p-media-loader-core/src/p2p/commands/types.ts
index 27e9b144..cbe2aba3 100644
--- a/packages/p2p-media-loader-core/src/p2p/commands/types.ts
+++ b/packages/p2p-media-loader-core/src/p2p/commands/types.ts
@@ -6,12 +6,15 @@ export enum PeerCommandType {
SegmentsAnnouncement,
SegmentRequest,
SegmentData,
+ SegmentDataSendingCompleted,
SegmentAbsent,
CancelSegmentRequest,
}
export type PeerSegmentCommand = BasePeerCommand<
- PeerCommandType.SegmentAbsent | PeerCommandType.CancelSegmentRequest
+ | PeerCommandType.SegmentAbsent
+ | PeerCommandType.CancelSegmentRequest
+ | PeerCommandType.SegmentDataSendingCompleted
> & {
i: number; // segment id
};
diff --git a/packages/p2p-media-loader-core/src/p2p/loader.ts b/packages/p2p-media-loader-core/src/p2p/loader.ts
index cfda5d49..c276ba8b 100644
--- a/packages/p2p-media-loader-core/src/p2p/loader.ts
+++ b/packages/p2p-media-loader-core/src/p2p/loader.ts
@@ -120,7 +120,7 @@ export class P2PLoader {
return;
}
void peer.uploadSegmentData(
- segmentExternalId,
+ segment,
byteFrom !== undefined ? segmentData.slice(byteFrom) : segmentData
);
};
diff --git a/packages/p2p-media-loader-core/src/p2p/peer-base.ts b/packages/p2p-media-loader-core/src/p2p/peer-protocol.ts
similarity index 62%
rename from packages/p2p-media-loader-core/src/p2p/peer-base.ts
rename to packages/p2p-media-loader-core/src/p2p/peer-protocol.ts
index 202d4912..eda3b608 100644
--- a/packages/p2p-media-loader-core/src/p2p/peer-base.ts
+++ b/packages/p2p-media-loader-core/src/p2p/peer-protocol.ts
@@ -1,46 +1,52 @@
import { PeerConnection } from "bittorrent-tracker";
import * as Command from "./commands";
import * as Utils from "../utils/utils";
-import debug from "debug";
import { Settings } from "../types";
export type PeerSettings = Pick<
Settings,
- "p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize"
+ "p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize" | "p2pErrorRetries"
>;
-export abstract class PeerBase {
- readonly id: string;
- private isUploadingSegment = false;
+export class PeerProtocol {
private commandChunks?: Command.BinaryCommandChunksJoiner;
- protected readonly logger = debug("core:peer");
+ private uploadingContext?: { stopUploading: () => void };
- protected constructor(
+ constructor(
private readonly connection: PeerConnection,
- protected readonly settings: PeerSettings
+ private readonly settings: PeerSettings,
+ private readonly eventHandlers: {
+ onCommandReceived: (command: Command.PeerCommand) => void;
+ onSegmentChunkReceived: (data: Uint8Array) => void;
+ onDestroy: () => void;
+ }
) {
- this.id = PeerBase.getPeerIdFromConnection(connection);
connection.on("data", this.onDataReceived);
connection.on("close", this.onPeerClosed);
connection.on("error", this.onConnectionError);
}
private onDataReceived = (data: Uint8Array) => {
- if (Command.isCommandChunk(data)) this.receivingCommandBytes(data);
- else this.receiveSegmentChunk(data);
+ if (Command.isCommandChunk(data)) {
+ this.receivingCommandBytes(data);
+ } else {
+ this.eventHandlers.onSegmentChunkReceived(data);
+ }
};
private onPeerClosed = () => {
- this.logger(`connection with peer closed: ${this.id}`);
this.destroy();
+ this.eventHandlers.onDestroy();
};
private onConnectionError = (error: { code: string }) => {
- this.logger(`peer error: ${this.id} ${error.code}`);
- this.destroy();
+ if (error.code === "ERR_DATA_CHANNEL") {
+ this.destroy();
+ this.eventHandlers.onDestroy();
+ }
};
- protected sendCommand(command: Command.PeerCommand) {
+ sendCommand(command: Command.PeerCommand) {
const binaryCommandBuffers = Command.serializePeerCommand(
command,
this.settings.webRtcMaxMessageSize
@@ -50,11 +56,26 @@ export abstract class PeerBase {
}
}
- protected async splitDataToChunksAndUploadAsync(data: Uint8Array) {
+ stopUploadingSegmentData() {
+ this.uploadingContext?.stopUploading();
+ this.uploadingContext = undefined;
+ }
+
+ async splitSegmentDataToChunksAndUploadAsync(data: Uint8Array) {
+ if (this.uploadingContext) {
+ throw new Error(`Some segment data is already uploading.`);
+ }
const chunks = getBufferChunks(data, this.settings.webRtcMaxMessageSize);
const channel = this.connection._channel;
const { promise, resolve, reject } = Utils.getControlledPromise();
+ let isUploadingSegmentData = false;
+ this.uploadingContext = {
+ stopUploading: () => {
+ isUploadingSegmentData = false;
+ },
+ };
+
const sendChunk = () => {
while (channel.bufferedAmount <= channel.bufferedAmountLowThreshold) {
const chunk = chunks.next().value;
@@ -62,7 +83,7 @@ export abstract class PeerBase {
resolve();
break;
}
- if (chunk && !this.isUploadingSegment) {
+ if (chunk && !isUploadingSegmentData) {
reject();
break;
}
@@ -71,26 +92,23 @@ export abstract class PeerBase {
};
try {
channel.addEventListener("bufferedamountlow", sendChunk);
- this.isUploadingSegment = true;
+ isUploadingSegmentData = true;
sendChunk();
await promise;
return promise;
} finally {
- this.isUploadingSegment = false;
+ channel.removeEventListener("bufferedamountlow", sendChunk);
+ this.uploadingContext = undefined;
}
}
- protected cancelDataUploading() {
- this.isUploadingSegment = false;
- }
-
private receivingCommandBytes(buffer: Uint8Array) {
if (!this.commandChunks) {
this.commandChunks = new Command.BinaryCommandChunksJoiner(
(commandBuffer) => {
this.commandChunks = undefined;
const command = Command.deserializeCommand(commandBuffer);
- this.receiveCommand(command);
+ this.eventHandlers.onCommandReceived(command);
}
);
}
@@ -102,23 +120,15 @@ export abstract class PeerBase {
}
}
- protected abstract receiveCommand(command: Command.PeerCommand): void;
-
- protected abstract receiveSegmentChunk(data: Uint8Array): void;
-
- protected destroy() {
+ destroy() {
this.connection.destroy();
}
-
- static getPeerIdFromConnection(connection: PeerConnection) {
- return Utils.hexToUtf8(connection.id);
- }
}
function* getBufferChunks(
data: ArrayBuffer,
maxChunkSize: number
-): Generator {
+): Generator {
let bytesLeft = data.byteLength;
while (bytesLeft > 0) {
const bytesToSend = bytesLeft >= maxChunkSize ? maxChunkSize : bytesLeft;
diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts
index 98f14cd9..ab98e4c9 100644
--- a/packages/p2p-media-loader-core/src/p2p/peer.ts
+++ b/packages/p2p-media-loader-core/src/p2p/peer.ts
@@ -1,13 +1,16 @@
import { PeerConnection } from "bittorrent-tracker";
-import { PeerBase, PeerSettings } from "./peer-base";
+import { PeerProtocol, PeerSettings } from "./peer-protocol";
import {
Request,
RequestControls,
RequestError,
PeerRequestErrorType,
+ RequestInnerErrorType,
} from "../request";
import * as Command from "./commands";
import { Segment } from "../types";
+import * as Utils from "../utils/utils";
+import debug from "debug";
const { PeerCommandType } = Command;
type PeerEventHandlers = {
@@ -19,21 +22,36 @@ type PeerEventHandlers = {
) => void;
};
-export class Peer extends PeerBase {
- private requestContext?: { request: Request; controls: RequestControls };
+export class Peer {
+ readonly id: string;
+ private readonly peerProtocol;
+ private downloadingContext?: {
+ request: Request;
+ controls: RequestControls;
+ isSegmentDataCommandReceived: boolean;
+ };
private loadedSegments = new Set();
private httpLoadingSegments = new Set();
+ private downloadingErrors: RequestError<
+ PeerRequestErrorType | RequestInnerErrorType
+ >[] = [];
+ private logger = debug("core:peer");
constructor(
connection: PeerConnection,
private readonly eventHandlers: PeerEventHandlers,
- settings: PeerSettings
+ private readonly settings: PeerSettings
) {
- super(connection, settings);
+ this.id = Peer.getPeerIdFromConnection(connection);
+ this.peerProtocol = new PeerProtocol(connection, settings, {
+ onSegmentChunkReceived: this.onSegmentChunkReceived,
+ onCommandReceived: this.onCommandReceived,
+ onDestroy: this.destroy,
+ });
}
get downloadingSegment(): Segment | undefined {
- return this.requestContext?.request.segment;
+ return this.downloadingContext?.request.segment;
}
getSegmentStatus(segment: Segment): "loaded" | "http-loading" | undefined {
@@ -42,7 +60,7 @@ export class Peer extends PeerBase {
if (this.httpLoadingSegments.has(externalId)) return "http-loading";
}
- protected receiveCommand(command: Command.PeerCommand) {
+ private onCommandReceived = (command: Command.PeerCommand) => {
switch (command.c) {
case PeerCommandType.SegmentsAnnouncement:
this.loadedSegments = new Set(command.l);
@@ -50,63 +68,94 @@ export class Peer extends PeerBase {
break;
case PeerCommandType.SegmentRequest:
+ this.peerProtocol.stopUploadingSegmentData();
this.eventHandlers.onSegmentRequested(this, command.i, command.b);
break;
case PeerCommandType.SegmentData:
{
- const request = this.requestContext?.request;
- this.requestContext?.controls.firstBytesReceived();
- if (
- request?.segment.externalId === command.i &&
- request.totalBytes === undefined
- ) {
+ if (!this.downloadingContext) break;
+ const { request, controls } = this.downloadingContext;
+ if (request.segment.externalId !== command.i) break;
+ this.downloadingContext.isSegmentDataCommandReceived = true;
+ controls.firstBytesReceived();
+ if (request.totalBytes === undefined) {
request.setTotalBytes(command.s);
+ } else if (request.totalBytes - request.loadedBytes !== command.s) {
+ this.cancelSegmentDownloading("peer-response-bytes-mismatch");
+ request.clearLoadedBytes();
+ }
+ }
+ break;
+
+ case PeerCommandType.SegmentDataSendingCompleted:
+ if (this.downloadingContext?.request.segment.externalId === command.i) {
+ const { request, controls } = this.downloadingContext;
+ if (request.loadedBytes !== request.totalBytes) {
+ request.clearLoadedBytes();
+ this.cancelSegmentDownloading("peer-response-bytes-mismatch");
+ } else {
+ controls.completeOnSuccess();
+ this.downloadingContext = undefined;
}
}
break;
case PeerCommandType.SegmentAbsent:
- if (this.requestContext?.request.segment.externalId === command.i) {
+ if (this.downloadingContext?.request.segment.externalId === command.i) {
this.cancelSegmentDownloading("peer-segment-absent");
this.loadedSegments.delete(command.i);
}
break;
case PeerCommandType.CancelSegmentRequest:
- this.cancelDataUploading();
+ this.peerProtocol.stopUploadingSegmentData();
break;
}
- }
+ };
- protected receiveSegmentChunk(chunk: Uint8Array): void {
- if (!this.requestContext) return;
- const { request, controls } = this.requestContext;
+ protected onSegmentChunkReceived = (chunk: Uint8Array) => {
+ if (!this.downloadingContext?.isSegmentDataCommandReceived) return;
+ const { request, controls } = this.downloadingContext;
controls.addLoadedChunk(chunk);
+ if (request.totalBytes === undefined) return;
if (request.loadedBytes === request.totalBytes) {
controls.completeOnSuccess();
- this.requestContext = undefined;
- } else if (
- request.totalBytes !== undefined &&
- request.loadedBytes > request.totalBytes
- ) {
+ this.downloadingContext = undefined;
+ } else if (request.loadedBytes > request.totalBytes) {
+ request.clearLoadedBytes();
this.cancelSegmentDownloading("peer-response-bytes-mismatch");
}
- }
+ };
downloadSegment(segmentRequest: Request) {
- if (this.requestContext) {
- throw new Error("Segment already is downloading");
+ if (this.downloadingContext) {
+ throw new Error("Some segment already is downloading");
}
- this.requestContext = {
+ this.downloadingContext = {
request: segmentRequest,
+ isSegmentDataCommandReceived: false,
controls: segmentRequest.start(
{ type: "p2p", peerId: this.id },
{
- abort: this.abortSegmentDownloading,
notReceivingBytesTimeoutMs:
this.settings.p2pNotReceivingBytesTimeoutMs,
+ abort: (error) => {
+ if (!this.downloadingContext) return;
+ const { request } = this.downloadingContext;
+ this.sendCancelSegmentRequestCommand(request.segment);
+ this.downloadingContext = undefined;
+ this.downloadingErrors.push(error);
+
+ const timeoutErrors = this.downloadingErrors.filter(
+ (error) => error.type === "bytes-receiving-timeout"
+ );
+ const { p2pErrorRetries } = this.settings;
+ if (timeoutErrors.length >= p2pErrorRetries) {
+ this.peerProtocol.destroy();
+ }
+ },
}
),
};
@@ -115,43 +164,41 @@ export class Peer extends PeerBase {
i: segmentRequest.segment.externalId,
};
if (segmentRequest.loadedBytes) command.b = segmentRequest.loadedBytes;
- this.sendCommand(command);
+ this.peerProtocol.sendCommand(command);
}
- private abortSegmentDownloading = () => {
- if (!this.requestContext) return;
- const { request } = this.requestContext;
- this.sendCancelSegmentRequestCommand(request.segment);
- this.requestContext = undefined;
- };
-
- async uploadSegmentData(segmentExternalId: number, data: ArrayBuffer) {
- this.logger(`send segment ${segmentExternalId} to ${this.id}`);
+ async uploadSegmentData(segment: Segment, data: ArrayBuffer) {
+ const { externalId } = segment;
+ this.logger(`send segment ${segment.externalId} to ${this.id}`);
const command: Command.PeerSendSegmentCommand = {
c: PeerCommandType.SegmentData,
- i: segmentExternalId,
+ i: externalId,
s: data.byteLength,
};
- this.sendCommand(command);
+ this.peerProtocol.sendCommand(command);
try {
- await this.splitDataToChunksAndUploadAsync(data as Uint8Array);
- this.logger(`segment ${segmentExternalId} has been sent to ${this.id}`);
+ await this.peerProtocol.splitSegmentDataToChunksAndUploadAsync(
+ data as Uint8Array
+ );
+ this.sendSegmentDataSendingCompletedCommand(segment);
+ this.logger(`segment ${externalId} has been sent to ${this.id}`);
} catch (err) {
- this.logger(`cancel segment uploading ${segmentExternalId}`);
+ this.logger(`cancel segment uploading ${externalId}`);
}
}
private cancelSegmentDownloading(type: PeerRequestErrorType) {
- if (!this.requestContext) return;
- const { request, controls } = this.requestContext;
- const { segment } = request;
- this.logger(`cancel segment request ${segment.externalId} (${type})`);
- const error = new RequestError(type);
+ if (!this.downloadingContext) return;
+ const { request, controls } = this.downloadingContext;
if (type === "peer-response-bytes-mismatch") {
this.sendCancelSegmentRequestCommand(request.segment);
}
+ const { segment } = request;
+ this.logger(`cancel segment request ${segment.externalId} (${type})`);
+ const error = new RequestError(type);
controls.abortOnError(error);
- this.requestContext = undefined;
+ this.downloadingContext = undefined;
+ this.downloadingErrors.push(error);
}
sendSegmentsAnnouncementCommand(
@@ -163,26 +210,37 @@ export class Peer extends PeerBase {
p: httpLoadingSegmentsIds,
l: loadedSegmentsIds,
};
- this.sendCommand(command);
+ this.peerProtocol.sendCommand(command);
}
sendSegmentAbsentCommand(segmentExternalId: number) {
- this.sendCommand({
+ this.peerProtocol.sendCommand({
c: PeerCommandType.SegmentAbsent,
i: segmentExternalId,
});
}
private sendCancelSegmentRequestCommand(segment: Segment) {
- this.sendCommand({
+ this.peerProtocol.sendCommand({
c: PeerCommandType.CancelSegmentRequest,
i: segment.externalId,
});
}
- destroy() {
- super.destroy();
+ private sendSegmentDataSendingCompletedCommand(segment: Segment) {
+ this.peerProtocol.sendCommand({
+ c: PeerCommandType.SegmentDataSendingCompleted,
+ i: segment.externalId,
+ });
+ }
+
+ destroy = () => {
this.cancelSegmentDownloading("peer-closed");
this.eventHandlers.onPeerClosed(this);
+ this.logger(`peer closed ${this.id}`);
+ };
+
+ static getPeerIdFromConnection(connection: PeerConnection) {
+ return Utils.hexToUtf8(connection.id);
}
}
diff --git a/packages/p2p-media-loader-core/src/request-container.ts b/packages/p2p-media-loader-core/src/request-container.ts
index f5f56667..46987820 100644
--- a/packages/p2p-media-loader-core/src/request-container.ts
+++ b/packages/p2p-media-loader-core/src/request-container.ts
@@ -72,9 +72,6 @@ export class RequestsContainer {
if (request.type === "p2p") yield request;
}
}
- isHybridLoaderRequested(segment: Segment): boolean {
- return !!this.requests.get(segment)?.type;
- }
destroy() {
for (const request of this.requests.values()) {
diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts
index 597f2aa8..2fa68fcc 100644
--- a/packages/p2p-media-loader-core/src/request.ts
+++ b/packages/p2p-media-loader-core/src/request.ts
@@ -37,7 +37,10 @@ export type RequestControls = Readonly<{
abortOnError: Request["abortOnError"];
}>;
-type OmitEncapsulated = Omit;
+type OmitEncapsulated = Omit<
+ T,
+ "error" | "errorTimestamp"
+>;
type StartRequestParameters =
| OmitEncapsulated
| OmitEncapsulated;
@@ -53,7 +56,7 @@ export class Request {
readonly id: string;
private _engineCallbacks?: EngineCallbacks;
private currentAttempt?: RequestAttempt;
- private _failedAttempts: RequestAttempt[] = [];
+ private _failedAttempts = new FailedRequestAttempts();
private finalData?: ArrayBuffer;
private bytes: Uint8Array[] = [];
private _loadedBytes = 0;
@@ -61,8 +64,11 @@ export class Request {
private _status: RequestStatus = "not-started";
private progress?: LoadProgress;
private notReceivingBytesTimeout: Timeout;
- private _abortRequestCallback?: (errorType: RequestInnerErrorType) => void;
+ private _abortRequestCallback?: (
+ error: RequestError
+ ) => void;
private readonly _logger: debug.Debugger;
+ private _isHandledByProcessQueue = false;
constructor(
readonly segment: Segment,
@@ -71,17 +77,33 @@ export class Request {
private readonly playback: Playback,
private readonly settings: StreamUtils.PlaybackTimeWindowsSettings
) {
- this.id = Request.getRequestItemId(segment);
+ this.id = Request.getRequestItemId(this.segment);
+ const { byteRange } = this.segment;
+ if (byteRange) {
+ const { end, start } = byteRange;
+ this._totalBytes = end - start + 1;
+ }
this.notReceivingBytesTimeout = new Timeout(this.abortOnTimeout);
const { type } = this.segment.stream;
this._logger = debug(`core:request-${type}`);
}
+ clearLoadedBytes() {
+ this._loadedBytes = 0;
+ this.bytes = [];
+ this._totalBytes = undefined;
+ }
+
get status() {
return this._status;
}
+ private setStatus(status: RequestStatus) {
+ this._status = status;
+ this._isHandledByProcessQueue = false;
+ }
+
get isSegmentRequestedByEngine(): boolean {
return !!this._engineCallbacks;
}
@@ -104,21 +126,25 @@ export class Request {
return this.finalData;
}
- get loadedPercent() {
- if (!this._totalBytes) return;
- return Utils.getPercent(this.loadedBytes, this._totalBytes);
+ get failedAttempts() {
+ return this._failedAttempts;
}
- get failedAttempts(): ReadonlyArray> {
- return this._failedAttempts;
+ get isHandledByProcessQueue() {
+ return this._isHandledByProcessQueue;
+ }
+
+ markHandledByProcessQueue() {
+ this._isHandledByProcessQueue = true;
}
- setOrResolveEngineCallbacks(callbacks: EngineCallbacks) {
+ setEngineCallbacks(callbacks: EngineCallbacks) {
if (this._engineCallbacks) {
throw new Error("Segment is already requested by engine");
}
+ this.failedAttempts.clear();
+ this._isHandledByProcessQueue = false;
this._engineCallbacks = callbacks;
- if (this.finalData) this.resolveEngineCallbacksSuccessfully(this.finalData);
}
setTotalBytes(value: number) {
@@ -132,17 +158,21 @@ export class Request {
requestData: StartRequestParameters,
controls: {
notReceivingBytesTimeoutMs?: number;
- abort: (errorType: RequestInnerErrorType) => void;
+ abort: (errorType: RequestError) => void;
}
): RequestControls {
if (this._status === "succeed") {
- throw new Error("Request has been already succeed.");
+ throw new Error(
+ `Request ${this.segment.externalId} has been already succeed.`
+ );
}
if (this._status === "loading") {
- throw new Error("Request has been already started.");
+ throw new Error(
+ `Request ${this.segment.externalId} has been already started.`
+ );
}
- this._status = "loading";
+ this.setStatus("loading");
this.currentAttempt = { ...requestData };
this.progress = {
startFromByte: this._loadedBytes,
@@ -175,15 +205,22 @@ export class Request {
};
}
- private resolveEngineCallbacksSuccessfully(data: ArrayBuffer) {
+ resolveEngineCallbacksSuccessfully() {
+ if (!this.finalData) return;
this._engineCallbacks?.onSuccess({
- data,
+ data: this.finalData,
bandwidth: this.bandwidthApproximator.getBandwidth(),
});
this._engineCallbacks = undefined;
}
+ resolveEngineCallbacksWithError() {
+ this._engineCallbacks?.onError(new CoreRequestError("failed"));
+ this._engineCallbacks = undefined;
+ }
+
abortFromEngine() {
+ if (this._status !== "loading") return;
this._engineCallbacks?.onError(new CoreRequestError("aborted"));
this._engineCallbacks = undefined;
this.requestProcessQueueCallback();
@@ -191,11 +228,11 @@ export class Request {
abortFromProcessQueue() {
this.throwErrorIfNotLoadingStatus();
- this._status = "aborted";
+ this.setStatus("aborted");
this.logger(
`${this.currentAttempt?.type} ${this.segment.externalId} aborted`
);
- this._abortRequestCallback?.("abort");
+ this._abortRequestCallback?.(new RequestError("abort"));
this._abortRequestCallback = undefined;
this.currentAttempt = undefined;
this.notReceivingBytesTimeout.clear();
@@ -205,13 +242,15 @@ export class Request {
this.throwErrorIfNotLoadingStatus();
if (!this.currentAttempt) return;
- this._status = "failed";
+ this.setStatus("failed");
const error = new RequestError("bytes-receiving-timeout");
- this._abortRequestCallback?.(error.type);
+ this._abortRequestCallback?.(error);
this.logger(`${this.type} ${this.segment.externalId} failed ${error.type}`);
- this.currentAttempt.error = error;
- this._failedAttempts.push(this.currentAttempt);
+ this._failedAttempts.add({
+ ...this.currentAttempt,
+ error,
+ });
this.notReceivingBytesTimeout.clear();
this.requestProcessQueueCallback();
};
@@ -220,10 +259,12 @@ export class Request {
this.throwErrorIfNotLoadingStatus();
if (!this.currentAttempt) return;
- this._status = "failed";
+ this.setStatus("failed");
this.logger(`${this.type} ${this.segment.externalId} failed ${error.type}`);
- this.currentAttempt.error = error;
- this._failedAttempts.push(this.currentAttempt);
+ this._failedAttempts.add({
+ ...this.currentAttempt,
+ error,
+ });
this.notReceivingBytesTimeout.clear();
this.requestProcessQueueCallback();
};
@@ -234,10 +275,10 @@ export class Request {
this.notReceivingBytesTimeout.clear();
this.finalData = Utils.joinChunks(this.bytes);
- this._status = "succeed";
+ this.setStatus("succeed");
this._totalBytes = this._loadedBytes;
- this.resolveEngineCallbacksSuccessfully(this.finalData);
+ this.resolveEngineCallbacksSuccessfully();
this.logger(
`${this.currentAttempt.type} ${this.segment.externalId} succeed`
);
@@ -277,9 +318,42 @@ export class Request {
}
}
+class FailedRequestAttempts {
+ private attempts: Required[] = [];
+ private _lastClearTimestamp = performance.now();
+
+ get lastClearTimestamp() {
+ return this._lastClearTimestamp;
+ }
+
+ add(attempt: Required) {
+ this.attempts.push(attempt);
+ }
+
+ get httpAttemptsCount() {
+ return this.attempts.reduce(
+ (sum, attempt) => (attempt.type === "http" ? sum + 1 : sum),
+ 0
+ );
+ }
+
+ get lastAttempt(): Readonly> | undefined {
+ return this.attempts[this.attempts.length - 1];
+ }
+
+ clear() {
+ this.attempts = [];
+ this._lastClearTimestamp = performance.now();
+ }
+}
+
const requestInnerErrorTypes = ["abort", "bytes-receiving-timeout"] as const;
-const httpRequestErrorTypes = ["fetch-error"] as const;
+const httpRequestErrorTypes = [
+ "http-error",
+ "http-bytes-mismatch",
+ "http-unexpected-status-code",
+] as const;
const peerRequestErrorTypes = [
"peer-response-bytes-mismatch",
@@ -299,18 +373,14 @@ type RequestErrorType =
export class RequestError<
T extends RequestErrorType = RequestErrorType,
> extends Error {
+ readonly timestamp: number;
+
constructor(
readonly type: T,
message?: string
) {
super(message);
- }
-
- static isRequestInnerErrorType(
- error: RequestError
- ): error is RequestError {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- return requestInnerErrorTypes.includes(error.type as any);
+ this.timestamp = performance.now();
}
}
diff --git a/packages/p2p-media-loader-core/src/types.d.ts b/packages/p2p-media-loader-core/src/types.d.ts
index 3e55bf4f..78d36e21 100644
--- a/packages/p2p-media-loader-core/src/types.d.ts
+++ b/packages/p2p-media-loader-core/src/types.d.ts
@@ -59,6 +59,8 @@ export type Settings = {
p2pNotReceivingBytesTimeoutMs: number;
p2pLoaderDestroyTimeoutMs: number;
httpNotReceivingBytesTimeoutMs: number;
+ httpErrorRetries: number;
+ p2pErrorRetries: number;
};
export type CoreEventHandlers = {
diff --git a/packages/p2p-media-loader-shaka/src/stream-utils.ts b/packages/p2p-media-loader-shaka/src/stream-utils.ts
index 4a84c8a7..563a50ee 100644
--- a/packages/p2p-media-loader-shaka/src/stream-utils.ts
+++ b/packages/p2p-media-loader-shaka/src/stream-utils.ts
@@ -19,6 +19,7 @@ export function createSegment({
return {
localId: localId ?? getSegmentLocalId(url, byteRange),
externalId,
+ byteRange,
url,
startTime,
endTime,