Skip to content

Commit

Permalink
Add ability to resume segment downloading starting from certain byte.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Dec 21, 2023
1 parent ba0976e commit 4a9bae9
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 91 deletions.
2 changes: 1 addition & 1 deletion p2p-media-loader-demo/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<title>Vite + React + TS</title>
<script type="text/javascript" src="https://cdn.jsdelivr.net/npm/clappr@latest"></script>
<script type="text/javascript" src="https://cdn.jsdelivr.net/gh/clappr/clappr-level-selector-plugin@latest/dist/level-selector.min.js"></script>
<script type="text/javascript" src="https://cdn.jsdelivr.net/npm/shaka-player@~4.4.0/dist/shaka-player.compiled.min.js"></script>
<script type="text/javascript" src="https://cdn.jsdelivr.net/npm/shaka-player@~4.6.0/dist/shaka-player.compiled.min.js"></script>
<script type="text/javascript" src="https://cdn.jsdelivr.net/gh/clappr/dash-shaka-playback@latest/dist/dash-shaka-playback.external.js"></script>
</head>
<body>
Expand Down
2 changes: 2 additions & 0 deletions p2p-media-loader-demo/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const streamUrls = {
hlsBigBunnyBuck: "https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8",
hlsByteRangeVideo:
"https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/bipbop_16x9_variant.m3u8",
hlsOneLevelByteRangeVideo:
"https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/gear1/prog_index.m3u8",
hlsAdvancedVideo:
"https://devstreaming-cdn.apple.com/videos/streaming/examples/adv_dv_atmos/main.m3u8",
hlsAdvancedVideo2:
Expand Down
88 changes: 76 additions & 12 deletions packages/p2p-media-loader-core/src/http-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ export async function fulfillHttpSegmentRequest(
request: Request,
settings: Pick<Settings, "httpNotReceivingBytesTimeoutMs">
) {
const headers = new Headers();
const { segment } = request;
const requestHeaders = new Headers();
const { segment, loadedBytes: alreadyLoadedBytes } = request;
const { url, byteRange } = segment;

if (byteRange) {
const { start, end } = byteRange;
const byteRangeString = `bytes=${start}-${end}`;
headers.set("Range", byteRangeString);
let byteFrom = byteRange?.start;
const byteTo = byteRange?.end;
if (alreadyLoadedBytes !== 0) byteFrom = (byteFrom ?? 0) + alreadyLoadedBytes;

if (byteFrom !== undefined) {
const byteRangeString = `bytes=${byteFrom}-${byteTo ?? ""}`;
requestHeaders.set("Range", byteRangeString);
}

const abortController = new AbortController();
Expand All @@ -25,18 +28,27 @@ export async function fulfillHttpSegmentRequest(
);
try {
const fetchResponse = await window.fetch(url, {
headers,
headers: requestHeaders,
signal: abortController.signal,
});
requestControls.firstBytesReceived();

if (!fetchResponse.ok) {
throw new RequestError("fetch-error", fetchResponse.statusText);
}

if (!fetchResponse.body) return;
const totalBytesString = fetchResponse.headers.get("Content-Length");
if (totalBytesString) request.setTotalBytes(+totalBytesString);
requestControls.firstBytesReceived();

if (
byteFrom !== undefined &&
(fetchResponse.status !== 206 ||
!isResponseWithRequestedContentRange(fetchResponse, byteFrom, byteTo))
) {
request.clearLoadedBytes();
}

if (request.totalBytes === undefined) {
const totalBytesString = fetchResponse.headers.get("Content-Length");
if (totalBytesString) request.setTotalBytes(+totalBytesString);
}

const reader = fetchResponse.body.getReader();
for await (const chunk of readStream(reader)) {
Expand Down Expand Up @@ -66,3 +78,55 @@ async function* readStream(
yield value;
}
}

function getValueFromContentRangeHeader(headerValue: string) {
const match = headerValue
.trim()
.match(/^bytes (?:(?:(\d+)|)-(?:(\d+)|)|\*)\/(?:(\d+)|\*)$/);
if (!match) return;

const [, from, to, total] = match;
return {
from: from ? parseInt(from) : undefined,
to: to ? parseInt(to) : undefined,
total: total ? parseInt(total) : undefined,
};
}

function isResponseWithRequestedContentRange(
response: Response,
requestedFromByte: number,
requestedToByte?: number
): boolean {
const requestedBytesAmount =
requestedToByte !== undefined
? requestedToByte - requestedFromByte + 1
: undefined;

const { headers } = response;
const contentLengthHeader = headers.get("Content-Length");
const contentLength = contentLengthHeader && parseInt(contentLengthHeader);

if (
contentLength &&
requestedBytesAmount !== undefined &&
requestedBytesAmount !== contentLength
) {
return false;
}

const contentRangeHeader = headers.get("Content-Range");
const contentRange =
contentRangeHeader && getValueFromContentRangeHeader(contentRangeHeader);
if (!contentRange) return true;
const { from, to } = contentRange;
if (from !== requestedFromByte) return false;
if (
to !== undefined &&
requestedToByte !== undefined &&
to !== requestedToByte
) {
return false;
}
return true;
}
76 changes: 43 additions & 33 deletions packages/p2p-media-loader-core/src/p2p/peer-base.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,51 @@
import { PeerConnection } from "bittorrent-tracker";
import * as Command from "./commands";
import * as Utils from "../utils/utils";
import debug from "debug";
import { Settings } from "../types";

export type PeerSettings = Pick<
Settings,
"p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize"
>;

export abstract class PeerBase {
readonly id: string;
private isUploadingSegment = false;
export class PeerInterface {
private commandChunks?: Command.BinaryCommandChunksJoiner;
protected readonly logger = debug("core:peer");
private uploadingContext?: { stopUploading: () => void };

protected constructor(
constructor(
private readonly connection: PeerConnection,
protected readonly settings: PeerSettings
private readonly settings: PeerSettings,
private readonly eventHandlers: {
onCommandReceived: (command: Command.PeerCommand) => void;
onSegmentChunkReceived: (data: Uint8Array) => void;
onDestroy: () => void;
}
) {
this.id = PeerBase.getPeerIdFromConnection(connection);
connection.on("data", this.onDataReceived);
connection.on("close", this.onPeerClosed);
connection.on("error", this.onConnectionError);
}

private onDataReceived = (data: Uint8Array) => {
if (Command.isCommandChunk(data)) this.receivingCommandBytes(data);
else this.receiveSegmentChunk(data);
if (Command.isCommandChunk(data)) {
this.receivingCommandBytes(data);
} else {
this.eventHandlers.onSegmentChunkReceived(data);
}
};

private onPeerClosed = () => {
this.logger(`connection with peer closed: ${this.id}`);
this.destroy();
};

private onConnectionError = (error: { code: string }) => {
this.logger(`peer error: ${this.id} ${error.code}`);
this.destroy();
if (error.code === "ERR_DATA_CHANNEL") {
this.destroy();
this.eventHandlers.onDestroy();
}
};

protected sendCommand(command: Command.PeerCommand) {
sendCommand(command: Command.PeerCommand) {
const binaryCommandBuffers = Command.serializePeerCommand(
command,
this.settings.webRtcMaxMessageSize
Expand All @@ -50,19 +55,34 @@ export abstract class PeerBase {
}
}

protected async splitDataToChunksAndUploadAsync(data: Uint8Array) {
stopUploadingSegmentData() {
this.uploadingContext?.stopUploading();
this.uploadingContext = undefined;
}

async splitSegmentDataToChunksAndUploadAsync(data: Uint8Array) {
if (this.uploadingContext) {
throw new Error(`Some segment data is already uploading.`);
}
const chunks = getBufferChunks(data, this.settings.webRtcMaxMessageSize);
const channel = this.connection._channel;
const { promise, resolve, reject } = Utils.getControlledPromise<void>();

let isUploadingSegmentData = false;
this.uploadingContext = {
stopUploading: () => {
isUploadingSegmentData = false;
},
};

const sendChunk = () => {
while (channel.bufferedAmount <= channel.bufferedAmountLowThreshold) {
const chunk = chunks.next().value;
if (!chunk) {
resolve();
break;
}
if (chunk && !this.isUploadingSegment) {
if (chunk && !isUploadingSegmentData) {
reject();
break;
}
Expand All @@ -71,26 +91,23 @@ export abstract class PeerBase {
};
try {
channel.addEventListener("bufferedamountlow", sendChunk);
this.isUploadingSegment = true;
isUploadingSegmentData = true;
sendChunk();
await promise;
return promise;
} finally {
this.isUploadingSegment = false;
channel.removeEventListener("bufferedamountlow", sendChunk);
this.uploadingContext = undefined;
}
}

protected cancelDataUploading() {
this.isUploadingSegment = false;
}

private receivingCommandBytes(buffer: Uint8Array) {
if (!this.commandChunks) {
this.commandChunks = new Command.BinaryCommandChunksJoiner(
(commandBuffer) => {
this.commandChunks = undefined;
const command = Command.deserializeCommand(commandBuffer);
this.receiveCommand(command);
this.eventHandlers.onCommandReceived(command);
}
);
}
Expand All @@ -102,23 +119,16 @@ export abstract class PeerBase {
}
}

protected abstract receiveCommand(command: Command.PeerCommand): void;

protected abstract receiveSegmentChunk(data: Uint8Array): void;

protected destroy() {
destroy() {
this.connection.destroy();
}

static getPeerIdFromConnection(connection: PeerConnection) {
return Utils.hexToUtf8(connection.id);
this.eventHandlers.onDestroy();
}
}

function* getBufferChunks(
data: ArrayBuffer,
maxChunkSize: number
): Generator<ArrayBuffer> {
): Generator<ArrayBuffer, void> {
let bytesLeft = data.byteLength;
while (bytesLeft > 0) {
const bytesToSend = bytesLeft >= maxChunkSize ? maxChunkSize : bytesLeft;
Expand Down
Loading

0 comments on commit 4a9bae9

Please sign in to comment.