diff --git a/p2p-media-loader-demo/index.html b/p2p-media-loader-demo/index.html
index fccca772..4f96d729 100644
--- a/p2p-media-loader-demo/index.html
+++ b/p2p-media-loader-demo/index.html
@@ -6,7 +6,7 @@
Vite + React + TS
-
+
diff --git a/p2p-media-loader-demo/src/App.tsx b/p2p-media-loader-demo/src/App.tsx
index 5603ce7b..43160052 100644
--- a/p2p-media-loader-demo/src/App.tsx
+++ b/p2p-media-loader-demo/src/App.tsx
@@ -23,6 +23,8 @@ const streamUrls = {
hlsBigBunnyBuck: "https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8",
hlsByteRangeVideo:
"https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/bipbop_16x9_variant.m3u8",
+ hlsOneLevelByteRangeVideo:
+ "https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/gear1/prog_index.m3u8",
hlsAdvancedVideo:
"https://devstreaming-cdn.apple.com/videos/streaming/examples/adv_dv_atmos/main.m3u8",
hlsAdvancedVideo2:
diff --git a/packages/p2p-media-loader-core/src/http-loader.ts b/packages/p2p-media-loader-core/src/http-loader.ts
index d81d9d3b..722b83ae 100644
--- a/packages/p2p-media-loader-core/src/http-loader.ts
+++ b/packages/p2p-media-loader-core/src/http-loader.ts
@@ -5,14 +5,17 @@ export async function fulfillHttpSegmentRequest(
request: Request,
settings: Pick
) {
- const headers = new Headers();
- const { segment } = request;
+ const requestHeaders = new Headers();
+ const { segment, loadedBytes: alreadyLoadedBytes } = request;
const { url, byteRange } = segment;
- if (byteRange) {
- const { start, end } = byteRange;
- const byteRangeString = `bytes=${start}-${end}`;
- headers.set("Range", byteRangeString);
+ let byteFrom = byteRange?.start;
+ const byteTo = byteRange?.end;
+ if (alreadyLoadedBytes !== 0) byteFrom = (byteFrom ?? 0) + alreadyLoadedBytes;
+
+ if (byteFrom !== undefined) {
+ const byteRangeString = `bytes=${byteFrom}-${byteTo ?? ""}`;
+ requestHeaders.set("Range", byteRangeString);
}
const abortController = new AbortController();
@@ -25,18 +28,27 @@ export async function fulfillHttpSegmentRequest(
);
try {
const fetchResponse = await window.fetch(url, {
- headers,
+ headers: requestHeaders,
signal: abortController.signal,
});
- requestControls.firstBytesReceived();
-
if (!fetchResponse.ok) {
throw new RequestError("fetch-error", fetchResponse.statusText);
}
-
if (!fetchResponse.body) return;
- const totalBytesString = fetchResponse.headers.get("Content-Length");
- if (totalBytesString) request.setTotalBytes(+totalBytesString);
+ requestControls.firstBytesReceived();
+
+ if (
+ byteFrom !== undefined &&
+ (fetchResponse.status !== 206 ||
+ !isResponseWithRequestedContentRange(fetchResponse, byteFrom, byteTo))
+ ) {
+ request.clearLoadedBytes();
+ }
+
+ if (request.totalBytes === undefined) {
+ const totalBytesString = fetchResponse.headers.get("Content-Length");
+ if (totalBytesString) request.setTotalBytes(+totalBytesString);
+ }
const reader = fetchResponse.body.getReader();
for await (const chunk of readStream(reader)) {
@@ -66,3 +78,55 @@ async function* readStream(
yield value;
}
}
+
+function getValueFromContentRangeHeader(headerValue: string) {
+ const match = headerValue
+ .trim()
+ .match(/^bytes (?:(?:(\d+)|)-(?:(\d+)|)|\*)\/(?:(\d+)|\*)$/);
+ if (!match) return;
+
+ const [, from, to, total] = match;
+ return {
+ from: from ? parseInt(from) : undefined,
+ to: to ? parseInt(to) : undefined,
+ total: total ? parseInt(total) : undefined,
+ };
+}
+
+function isResponseWithRequestedContentRange(
+ response: Response,
+ requestedFromByte: number,
+ requestedToByte?: number
+): boolean {
+ const requestedBytesAmount =
+ requestedToByte !== undefined
+ ? requestedToByte - requestedFromByte + 1
+ : undefined;
+
+ const { headers } = response;
+ const contentLengthHeader = headers.get("Content-Length");
+ const contentLength = contentLengthHeader && parseInt(contentLengthHeader);
+
+ if (
+ contentLength &&
+ requestedBytesAmount !== undefined &&
+ requestedBytesAmount !== contentLength
+ ) {
+ return false;
+ }
+
+ const contentRangeHeader = headers.get("Content-Range");
+ const contentRange =
+ contentRangeHeader && getValueFromContentRangeHeader(contentRangeHeader);
+ if (!contentRange) return true;
+ const { from, to } = contentRange;
+ if (from !== requestedFromByte) return false;
+ if (
+ to !== undefined &&
+ requestedToByte !== undefined &&
+ to !== requestedToByte
+ ) {
+ return false;
+ }
+ return true;
+}
diff --git a/packages/p2p-media-loader-core/src/p2p/peer-base.ts b/packages/p2p-media-loader-core/src/p2p/peer-base.ts
index 202d4912..2ebf2498 100644
--- a/packages/p2p-media-loader-core/src/p2p/peer-base.ts
+++ b/packages/p2p-media-loader-core/src/p2p/peer-base.ts
@@ -1,7 +1,6 @@
import { PeerConnection } from "bittorrent-tracker";
import * as Command from "./commands";
import * as Utils from "../utils/utils";
-import debug from "debug";
import { Settings } from "../types";
export type PeerSettings = Pick<
@@ -9,38 +8,44 @@ export type PeerSettings = Pick<
"p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize"
>;
-export abstract class PeerBase {
- readonly id: string;
- private isUploadingSegment = false;
+export class PeerInterface {
private commandChunks?: Command.BinaryCommandChunksJoiner;
- protected readonly logger = debug("core:peer");
+ private uploadingContext?: { stopUploading: () => void };
- protected constructor(
+ constructor(
private readonly connection: PeerConnection,
- protected readonly settings: PeerSettings
+ private readonly settings: PeerSettings,
+ private readonly eventHandlers: {
+ onCommandReceived: (command: Command.PeerCommand) => void;
+ onSegmentChunkReceived: (data: Uint8Array) => void;
+ onDestroy: () => void;
+ }
) {
- this.id = PeerBase.getPeerIdFromConnection(connection);
connection.on("data", this.onDataReceived);
connection.on("close", this.onPeerClosed);
connection.on("error", this.onConnectionError);
}
private onDataReceived = (data: Uint8Array) => {
- if (Command.isCommandChunk(data)) this.receivingCommandBytes(data);
- else this.receiveSegmentChunk(data);
+ if (Command.isCommandChunk(data)) {
+ this.receivingCommandBytes(data);
+ } else {
+ this.eventHandlers.onSegmentChunkReceived(data);
+ }
};
private onPeerClosed = () => {
- this.logger(`connection with peer closed: ${this.id}`);
this.destroy();
};
private onConnectionError = (error: { code: string }) => {
- this.logger(`peer error: ${this.id} ${error.code}`);
- this.destroy();
+ if (error.code === "ERR_DATA_CHANNEL") {
+ this.destroy();
+ this.eventHandlers.onDestroy();
+ }
};
- protected sendCommand(command: Command.PeerCommand) {
+ sendCommand(command: Command.PeerCommand) {
const binaryCommandBuffers = Command.serializePeerCommand(
command,
this.settings.webRtcMaxMessageSize
@@ -50,11 +55,26 @@ export abstract class PeerBase {
}
}
- protected async splitDataToChunksAndUploadAsync(data: Uint8Array) {
+ stopUploadingSegmentData() {
+ this.uploadingContext?.stopUploading();
+ this.uploadingContext = undefined;
+ }
+
+ async splitSegmentDataToChunksAndUploadAsync(data: Uint8Array) {
+ if (this.uploadingContext) {
+ throw new Error(`Some segment data is already uploading.`);
+ }
const chunks = getBufferChunks(data, this.settings.webRtcMaxMessageSize);
const channel = this.connection._channel;
const { promise, resolve, reject } = Utils.getControlledPromise();
+ let isUploadingSegmentData = false;
+ this.uploadingContext = {
+ stopUploading: () => {
+ isUploadingSegmentData = false;
+ },
+ };
+
const sendChunk = () => {
while (channel.bufferedAmount <= channel.bufferedAmountLowThreshold) {
const chunk = chunks.next().value;
@@ -62,7 +82,7 @@ export abstract class PeerBase {
resolve();
break;
}
- if (chunk && !this.isUploadingSegment) {
+ if (chunk && !isUploadingSegmentData) {
reject();
break;
}
@@ -71,26 +91,23 @@ export abstract class PeerBase {
};
try {
channel.addEventListener("bufferedamountlow", sendChunk);
- this.isUploadingSegment = true;
+ isUploadingSegmentData = true;
sendChunk();
await promise;
return promise;
} finally {
- this.isUploadingSegment = false;
+ channel.removeEventListener("bufferedamountlow", sendChunk);
+ this.uploadingContext = undefined;
}
}
- protected cancelDataUploading() {
- this.isUploadingSegment = false;
- }
-
private receivingCommandBytes(buffer: Uint8Array) {
if (!this.commandChunks) {
this.commandChunks = new Command.BinaryCommandChunksJoiner(
(commandBuffer) => {
this.commandChunks = undefined;
const command = Command.deserializeCommand(commandBuffer);
- this.receiveCommand(command);
+ this.eventHandlers.onCommandReceived(command);
}
);
}
@@ -102,23 +119,16 @@ export abstract class PeerBase {
}
}
- protected abstract receiveCommand(command: Command.PeerCommand): void;
-
- protected abstract receiveSegmentChunk(data: Uint8Array): void;
-
- protected destroy() {
+ destroy() {
this.connection.destroy();
- }
-
- static getPeerIdFromConnection(connection: PeerConnection) {
- return Utils.hexToUtf8(connection.id);
+ this.eventHandlers.onDestroy();
}
}
function* getBufferChunks(
data: ArrayBuffer,
maxChunkSize: number
-): Generator {
+): Generator {
let bytesLeft = data.byteLength;
while (bytesLeft > 0) {
const bytesToSend = bytesLeft >= maxChunkSize ? maxChunkSize : bytesLeft;
diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts
index 98f14cd9..ac22edd2 100644
--- a/packages/p2p-media-loader-core/src/p2p/peer.ts
+++ b/packages/p2p-media-loader-core/src/p2p/peer.ts
@@ -1,5 +1,5 @@
import { PeerConnection } from "bittorrent-tracker";
-import { PeerBase, PeerSettings } from "./peer-base";
+import { PeerInterface, PeerSettings } from "./peer-base";
import {
Request,
RequestControls,
@@ -8,6 +8,8 @@ import {
} from "../request";
import * as Command from "./commands";
import { Segment } from "../types";
+import * as Utils from "../utils/utils";
+import debug from "debug";
const { PeerCommandType } = Command;
type PeerEventHandlers = {
@@ -19,21 +21,33 @@ type PeerEventHandlers = {
) => void;
};
-export class Peer extends PeerBase {
- private requestContext?: { request: Request; controls: RequestControls };
+export class Peer {
+ private readonly id: string;
+ private readonly peerInterface;
+ private downloadingContext?: {
+ request: Request;
+ controls: RequestControls;
+ isSegmentDataCommandReceived: boolean;
+ };
private loadedSegments = new Set();
private httpLoadingSegments = new Set();
+ private logger = debug("core:peer");
constructor(
connection: PeerConnection,
private readonly eventHandlers: PeerEventHandlers,
- settings: PeerSettings
+ private readonly settings: PeerSettings
) {
- super(connection, settings);
+ this.id = Peer.getPeerIdFromConnection(connection);
+ this.peerInterface = new PeerInterface(connection, settings, {
+ onSegmentChunkReceived: this.onSegmentChunkReceived,
+ onCommandReceived: this.onCommandReceived,
+ onDestroy: this.destroy,
+ });
}
get downloadingSegment(): Segment | undefined {
- return this.requestContext?.request.segment;
+ return this.downloadingContext?.request.segment;
}
getSegmentStatus(segment: Segment): "loaded" | "http-loading" | undefined {
@@ -42,7 +56,7 @@ export class Peer extends PeerBase {
if (this.httpLoadingSegments.has(externalId)) return "http-loading";
}
- protected receiveCommand(command: Command.PeerCommand) {
+ private onCommandReceived = (command: Command.PeerCommand) => {
switch (command.c) {
case PeerCommandType.SegmentsAnnouncement:
this.loadedSegments = new Set(command.l);
@@ -50,15 +64,18 @@ export class Peer extends PeerBase {
break;
case PeerCommandType.SegmentRequest:
+ this.peerInterface.stopUploadingSegmentData();
this.eventHandlers.onSegmentRequested(this, command.i, command.b);
break;
case PeerCommandType.SegmentData:
{
- const request = this.requestContext?.request;
- this.requestContext?.controls.firstBytesReceived();
+ if (!this.downloadingContext) break;
+ this.downloadingContext.isSegmentDataCommandReceived = true;
+ const { request, controls } = this.downloadingContext;
+ controls.firstBytesReceived();
if (
- request?.segment.externalId === command.i &&
+ request.segment.externalId === command.i &&
request.totalBytes === undefined
) {
request.setTotalBytes(command.s);
@@ -67,40 +84,39 @@ export class Peer extends PeerBase {
break;
case PeerCommandType.SegmentAbsent:
- if (this.requestContext?.request.segment.externalId === command.i) {
+ if (this.downloadingContext?.request.segment.externalId === command.i) {
this.cancelSegmentDownloading("peer-segment-absent");
this.loadedSegments.delete(command.i);
}
break;
case PeerCommandType.CancelSegmentRequest:
- this.cancelDataUploading();
+ this.peerInterface.stopUploadingSegmentData();
break;
}
- }
+ };
- protected receiveSegmentChunk(chunk: Uint8Array): void {
- if (!this.requestContext) return;
- const { request, controls } = this.requestContext;
+ protected onSegmentChunkReceived = (chunk: Uint8Array) => {
+ if (!this.downloadingContext?.isSegmentDataCommandReceived) return;
+ const { request, controls } = this.downloadingContext;
controls.addLoadedChunk(chunk);
+ if (request.totalBytes === undefined) return;
if (request.loadedBytes === request.totalBytes) {
controls.completeOnSuccess();
- this.requestContext = undefined;
- } else if (
- request.totalBytes !== undefined &&
- request.loadedBytes > request.totalBytes
- ) {
+ this.downloadingContext = undefined;
+ } else if (request.loadedBytes > request.totalBytes) {
this.cancelSegmentDownloading("peer-response-bytes-mismatch");
}
- }
+ };
downloadSegment(segmentRequest: Request) {
- if (this.requestContext) {
- throw new Error("Segment already is downloading");
+ if (this.downloadingContext) {
+ throw new Error("Some segment already is downloading");
}
- this.requestContext = {
+ this.downloadingContext = {
request: segmentRequest,
+ isSegmentDataCommandReceived: false,
controls: segmentRequest.start(
{ type: "p2p", peerId: this.id },
{
@@ -115,14 +131,14 @@ export class Peer extends PeerBase {
i: segmentRequest.segment.externalId,
};
if (segmentRequest.loadedBytes) command.b = segmentRequest.loadedBytes;
- this.sendCommand(command);
+ this.peerInterface.sendCommand(command);
}
private abortSegmentDownloading = () => {
- if (!this.requestContext) return;
- const { request } = this.requestContext;
+ if (!this.downloadingContext) return;
+ const { request } = this.downloadingContext;
this.sendCancelSegmentRequestCommand(request.segment);
- this.requestContext = undefined;
+ this.downloadingContext = undefined;
};
async uploadSegmentData(segmentExternalId: number, data: ArrayBuffer) {
@@ -132,9 +148,11 @@ export class Peer extends PeerBase {
i: segmentExternalId,
s: data.byteLength,
};
- this.sendCommand(command);
+ this.peerInterface.sendCommand(command);
try {
- await this.splitDataToChunksAndUploadAsync(data as Uint8Array);
+ await this.peerInterface.splitSegmentDataToChunksAndUploadAsync(
+ data as Uint8Array
+ );
this.logger(`segment ${segmentExternalId} has been sent to ${this.id}`);
} catch (err) {
this.logger(`cancel segment uploading ${segmentExternalId}`);
@@ -142,16 +160,16 @@ export class Peer extends PeerBase {
}
private cancelSegmentDownloading(type: PeerRequestErrorType) {
- if (!this.requestContext) return;
- const { request, controls } = this.requestContext;
- const { segment } = request;
- this.logger(`cancel segment request ${segment.externalId} (${type})`);
- const error = new RequestError(type);
+ if (!this.downloadingContext) return;
+ const { request, controls } = this.downloadingContext;
if (type === "peer-response-bytes-mismatch") {
this.sendCancelSegmentRequestCommand(request.segment);
}
+ const { segment } = request;
+ this.logger(`cancel segment request ${segment.externalId} (${type})`);
+ const error = new RequestError(type);
controls.abortOnError(error);
- this.requestContext = undefined;
+ this.downloadingContext = undefined;
}
sendSegmentsAnnouncementCommand(
@@ -163,26 +181,30 @@ export class Peer extends PeerBase {
p: httpLoadingSegmentsIds,
l: loadedSegmentsIds,
};
- this.sendCommand(command);
+ this.peerInterface.sendCommand(command);
}
sendSegmentAbsentCommand(segmentExternalId: number) {
- this.sendCommand({
+ this.peerInterface.sendCommand({
c: PeerCommandType.SegmentAbsent,
i: segmentExternalId,
});
}
private sendCancelSegmentRequestCommand(segment: Segment) {
- this.sendCommand({
+ this.peerInterface.sendCommand({
c: PeerCommandType.CancelSegmentRequest,
i: segment.externalId,
});
}
- destroy() {
- super.destroy();
+ destroy = () => {
+ this.peerInterface.destroy();
this.cancelSegmentDownloading("peer-closed");
this.eventHandlers.onPeerClosed(this);
+ };
+
+ static getPeerIdFromConnection(connection: PeerConnection) {
+ return Utils.hexToUtf8(connection.id);
}
}
diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts
index a36cbef9..a63f7eae 100644
--- a/packages/p2p-media-loader-core/src/request.ts
+++ b/packages/p2p-media-loader-core/src/request.ts
@@ -71,13 +71,24 @@ export class Request {
private readonly playback: Playback,
private readonly settings: StreamUtils.PlaybackTimeWindowsSettings
) {
- this.id = Request.getRequestItemId(segment);
+ this.id = Request.getRequestItemId(this.segment);
+ const { byteRange } = this.segment;
+ if (byteRange) {
+ const { end, start } = byteRange;
+ this._totalBytes = end - start + 1;
+ }
this.notReceivingBytesTimeout = new Timeout(this.abortOnTimeout);
const { type } = this.segment.stream;
this._logger = debug(`core:request-${type}`);
}
+ clearLoadedBytes() {
+ this._loadedBytes = 0;
+ this.bytes = [];
+ this._totalBytes = undefined;
+ }
+
get status() {
return this._status;
}
@@ -136,10 +147,14 @@ export class Request {
}
): RequestControls {
if (this._status === "succeed") {
- throw new Error("Request has been already succeed.");
+ throw new Error(
+ `Request ${this.segment.externalId} has been already succeed.`
+ );
}
if (this._status === "loading") {
- throw new Error("Request has been already started.");
+ throw new Error(
+ `Request ${this.segment.externalId} has been already started.`
+ );
}
this._status = "loading";
diff --git a/packages/p2p-media-loader-shaka/src/stream-utils.ts b/packages/p2p-media-loader-shaka/src/stream-utils.ts
index a0a955a1..3b45b099 100644
--- a/packages/p2p-media-loader-shaka/src/stream-utils.ts
+++ b/packages/p2p-media-loader-shaka/src/stream-utils.ts
@@ -19,6 +19,7 @@ export function createSegment({
return {
localId: localId ?? getSegmentLocalId(url, byteRange),
externalId,
+ byteRange,
url,
startTime,
endTime,