From 4a9bae97ddb68824d126930bf8857b827872a536 Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Thu, 21 Dec 2023 20:15:15 +0200 Subject: [PATCH] Add ability to resume segment downloading starting from certain byte. --- p2p-media-loader-demo/index.html | 2 +- p2p-media-loader-demo/src/App.tsx | 2 + .../p2p-media-loader-core/src/http-loader.ts | 88 +++++++++++++-- .../src/p2p/peer-base.ts | 76 +++++++------ .../p2p-media-loader-core/src/p2p/peer.ts | 106 +++++++++++------- packages/p2p-media-loader-core/src/request.ts | 21 +++- .../src/stream-utils.ts | 1 + 7 files changed, 205 insertions(+), 91 deletions(-) diff --git a/p2p-media-loader-demo/index.html b/p2p-media-loader-demo/index.html index fccca772..4f96d729 100644 --- a/p2p-media-loader-demo/index.html +++ b/p2p-media-loader-demo/index.html @@ -6,7 +6,7 @@ Vite + React + TS - + diff --git a/p2p-media-loader-demo/src/App.tsx b/p2p-media-loader-demo/src/App.tsx index 5603ce7b..43160052 100644 --- a/p2p-media-loader-demo/src/App.tsx +++ b/p2p-media-loader-demo/src/App.tsx @@ -23,6 +23,8 @@ const streamUrls = { hlsBigBunnyBuck: "https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8", hlsByteRangeVideo: "https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/bipbop_16x9_variant.m3u8", + hlsOneLevelByteRangeVideo: + "https://devstreaming-cdn.apple.com/videos/streaming/examples/bipbop_16x9/gear1/prog_index.m3u8", hlsAdvancedVideo: "https://devstreaming-cdn.apple.com/videos/streaming/examples/adv_dv_atmos/main.m3u8", hlsAdvancedVideo2: diff --git a/packages/p2p-media-loader-core/src/http-loader.ts b/packages/p2p-media-loader-core/src/http-loader.ts index d81d9d3b..722b83ae 100644 --- a/packages/p2p-media-loader-core/src/http-loader.ts +++ b/packages/p2p-media-loader-core/src/http-loader.ts @@ -5,14 +5,17 @@ export async function fulfillHttpSegmentRequest( request: Request, settings: Pick ) { - const headers = new Headers(); - const { segment } = request; + const requestHeaders = new Headers(); + const { segment, loadedBytes: alreadyLoadedBytes } = request; const { url, byteRange } = segment; - if (byteRange) { - const { start, end } = byteRange; - const byteRangeString = `bytes=${start}-${end}`; - headers.set("Range", byteRangeString); + let byteFrom = byteRange?.start; + const byteTo = byteRange?.end; + if (alreadyLoadedBytes !== 0) byteFrom = (byteFrom ?? 0) + alreadyLoadedBytes; + + if (byteFrom !== undefined) { + const byteRangeString = `bytes=${byteFrom}-${byteTo ?? ""}`; + requestHeaders.set("Range", byteRangeString); } const abortController = new AbortController(); @@ -25,18 +28,27 @@ export async function fulfillHttpSegmentRequest( ); try { const fetchResponse = await window.fetch(url, { - headers, + headers: requestHeaders, signal: abortController.signal, }); - requestControls.firstBytesReceived(); - if (!fetchResponse.ok) { throw new RequestError("fetch-error", fetchResponse.statusText); } - if (!fetchResponse.body) return; - const totalBytesString = fetchResponse.headers.get("Content-Length"); - if (totalBytesString) request.setTotalBytes(+totalBytesString); + requestControls.firstBytesReceived(); + + if ( + byteFrom !== undefined && + (fetchResponse.status !== 206 || + !isResponseWithRequestedContentRange(fetchResponse, byteFrom, byteTo)) + ) { + request.clearLoadedBytes(); + } + + if (request.totalBytes === undefined) { + const totalBytesString = fetchResponse.headers.get("Content-Length"); + if (totalBytesString) request.setTotalBytes(+totalBytesString); + } const reader = fetchResponse.body.getReader(); for await (const chunk of readStream(reader)) { @@ -66,3 +78,55 @@ async function* readStream( yield value; } } + +function getValueFromContentRangeHeader(headerValue: string) { + const match = headerValue + .trim() + .match(/^bytes (?:(?:(\d+)|)-(?:(\d+)|)|\*)\/(?:(\d+)|\*)$/); + if (!match) return; + + const [, from, to, total] = match; + return { + from: from ? parseInt(from) : undefined, + to: to ? parseInt(to) : undefined, + total: total ? parseInt(total) : undefined, + }; +} + +function isResponseWithRequestedContentRange( + response: Response, + requestedFromByte: number, + requestedToByte?: number +): boolean { + const requestedBytesAmount = + requestedToByte !== undefined + ? requestedToByte - requestedFromByte + 1 + : undefined; + + const { headers } = response; + const contentLengthHeader = headers.get("Content-Length"); + const contentLength = contentLengthHeader && parseInt(contentLengthHeader); + + if ( + contentLength && + requestedBytesAmount !== undefined && + requestedBytesAmount !== contentLength + ) { + return false; + } + + const contentRangeHeader = headers.get("Content-Range"); + const contentRange = + contentRangeHeader && getValueFromContentRangeHeader(contentRangeHeader); + if (!contentRange) return true; + const { from, to } = contentRange; + if (from !== requestedFromByte) return false; + if ( + to !== undefined && + requestedToByte !== undefined && + to !== requestedToByte + ) { + return false; + } + return true; +} diff --git a/packages/p2p-media-loader-core/src/p2p/peer-base.ts b/packages/p2p-media-loader-core/src/p2p/peer-base.ts index 202d4912..2ebf2498 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer-base.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer-base.ts @@ -1,7 +1,6 @@ import { PeerConnection } from "bittorrent-tracker"; import * as Command from "./commands"; import * as Utils from "../utils/utils"; -import debug from "debug"; import { Settings } from "../types"; export type PeerSettings = Pick< @@ -9,38 +8,44 @@ export type PeerSettings = Pick< "p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize" >; -export abstract class PeerBase { - readonly id: string; - private isUploadingSegment = false; +export class PeerInterface { private commandChunks?: Command.BinaryCommandChunksJoiner; - protected readonly logger = debug("core:peer"); + private uploadingContext?: { stopUploading: () => void }; - protected constructor( + constructor( private readonly connection: PeerConnection, - protected readonly settings: PeerSettings + private readonly settings: PeerSettings, + private readonly eventHandlers: { + onCommandReceived: (command: Command.PeerCommand) => void; + onSegmentChunkReceived: (data: Uint8Array) => void; + onDestroy: () => void; + } ) { - this.id = PeerBase.getPeerIdFromConnection(connection); connection.on("data", this.onDataReceived); connection.on("close", this.onPeerClosed); connection.on("error", this.onConnectionError); } private onDataReceived = (data: Uint8Array) => { - if (Command.isCommandChunk(data)) this.receivingCommandBytes(data); - else this.receiveSegmentChunk(data); + if (Command.isCommandChunk(data)) { + this.receivingCommandBytes(data); + } else { + this.eventHandlers.onSegmentChunkReceived(data); + } }; private onPeerClosed = () => { - this.logger(`connection with peer closed: ${this.id}`); this.destroy(); }; private onConnectionError = (error: { code: string }) => { - this.logger(`peer error: ${this.id} ${error.code}`); - this.destroy(); + if (error.code === "ERR_DATA_CHANNEL") { + this.destroy(); + this.eventHandlers.onDestroy(); + } }; - protected sendCommand(command: Command.PeerCommand) { + sendCommand(command: Command.PeerCommand) { const binaryCommandBuffers = Command.serializePeerCommand( command, this.settings.webRtcMaxMessageSize @@ -50,11 +55,26 @@ export abstract class PeerBase { } } - protected async splitDataToChunksAndUploadAsync(data: Uint8Array) { + stopUploadingSegmentData() { + this.uploadingContext?.stopUploading(); + this.uploadingContext = undefined; + } + + async splitSegmentDataToChunksAndUploadAsync(data: Uint8Array) { + if (this.uploadingContext) { + throw new Error(`Some segment data is already uploading.`); + } const chunks = getBufferChunks(data, this.settings.webRtcMaxMessageSize); const channel = this.connection._channel; const { promise, resolve, reject } = Utils.getControlledPromise(); + let isUploadingSegmentData = false; + this.uploadingContext = { + stopUploading: () => { + isUploadingSegmentData = false; + }, + }; + const sendChunk = () => { while (channel.bufferedAmount <= channel.bufferedAmountLowThreshold) { const chunk = chunks.next().value; @@ -62,7 +82,7 @@ export abstract class PeerBase { resolve(); break; } - if (chunk && !this.isUploadingSegment) { + if (chunk && !isUploadingSegmentData) { reject(); break; } @@ -71,26 +91,23 @@ export abstract class PeerBase { }; try { channel.addEventListener("bufferedamountlow", sendChunk); - this.isUploadingSegment = true; + isUploadingSegmentData = true; sendChunk(); await promise; return promise; } finally { - this.isUploadingSegment = false; + channel.removeEventListener("bufferedamountlow", sendChunk); + this.uploadingContext = undefined; } } - protected cancelDataUploading() { - this.isUploadingSegment = false; - } - private receivingCommandBytes(buffer: Uint8Array) { if (!this.commandChunks) { this.commandChunks = new Command.BinaryCommandChunksJoiner( (commandBuffer) => { this.commandChunks = undefined; const command = Command.deserializeCommand(commandBuffer); - this.receiveCommand(command); + this.eventHandlers.onCommandReceived(command); } ); } @@ -102,23 +119,16 @@ export abstract class PeerBase { } } - protected abstract receiveCommand(command: Command.PeerCommand): void; - - protected abstract receiveSegmentChunk(data: Uint8Array): void; - - protected destroy() { + destroy() { this.connection.destroy(); - } - - static getPeerIdFromConnection(connection: PeerConnection) { - return Utils.hexToUtf8(connection.id); + this.eventHandlers.onDestroy(); } } function* getBufferChunks( data: ArrayBuffer, maxChunkSize: number -): Generator { +): Generator { let bytesLeft = data.byteLength; while (bytesLeft > 0) { const bytesToSend = bytesLeft >= maxChunkSize ? maxChunkSize : bytesLeft; diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index 98f14cd9..ac22edd2 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -1,5 +1,5 @@ import { PeerConnection } from "bittorrent-tracker"; -import { PeerBase, PeerSettings } from "./peer-base"; +import { PeerInterface, PeerSettings } from "./peer-base"; import { Request, RequestControls, @@ -8,6 +8,8 @@ import { } from "../request"; import * as Command from "./commands"; import { Segment } from "../types"; +import * as Utils from "../utils/utils"; +import debug from "debug"; const { PeerCommandType } = Command; type PeerEventHandlers = { @@ -19,21 +21,33 @@ type PeerEventHandlers = { ) => void; }; -export class Peer extends PeerBase { - private requestContext?: { request: Request; controls: RequestControls }; +export class Peer { + private readonly id: string; + private readonly peerInterface; + private downloadingContext?: { + request: Request; + controls: RequestControls; + isSegmentDataCommandReceived: boolean; + }; private loadedSegments = new Set(); private httpLoadingSegments = new Set(); + private logger = debug("core:peer"); constructor( connection: PeerConnection, private readonly eventHandlers: PeerEventHandlers, - settings: PeerSettings + private readonly settings: PeerSettings ) { - super(connection, settings); + this.id = Peer.getPeerIdFromConnection(connection); + this.peerInterface = new PeerInterface(connection, settings, { + onSegmentChunkReceived: this.onSegmentChunkReceived, + onCommandReceived: this.onCommandReceived, + onDestroy: this.destroy, + }); } get downloadingSegment(): Segment | undefined { - return this.requestContext?.request.segment; + return this.downloadingContext?.request.segment; } getSegmentStatus(segment: Segment): "loaded" | "http-loading" | undefined { @@ -42,7 +56,7 @@ export class Peer extends PeerBase { if (this.httpLoadingSegments.has(externalId)) return "http-loading"; } - protected receiveCommand(command: Command.PeerCommand) { + private onCommandReceived = (command: Command.PeerCommand) => { switch (command.c) { case PeerCommandType.SegmentsAnnouncement: this.loadedSegments = new Set(command.l); @@ -50,15 +64,18 @@ export class Peer extends PeerBase { break; case PeerCommandType.SegmentRequest: + this.peerInterface.stopUploadingSegmentData(); this.eventHandlers.onSegmentRequested(this, command.i, command.b); break; case PeerCommandType.SegmentData: { - const request = this.requestContext?.request; - this.requestContext?.controls.firstBytesReceived(); + if (!this.downloadingContext) break; + this.downloadingContext.isSegmentDataCommandReceived = true; + const { request, controls } = this.downloadingContext; + controls.firstBytesReceived(); if ( - request?.segment.externalId === command.i && + request.segment.externalId === command.i && request.totalBytes === undefined ) { request.setTotalBytes(command.s); @@ -67,40 +84,39 @@ export class Peer extends PeerBase { break; case PeerCommandType.SegmentAbsent: - if (this.requestContext?.request.segment.externalId === command.i) { + if (this.downloadingContext?.request.segment.externalId === command.i) { this.cancelSegmentDownloading("peer-segment-absent"); this.loadedSegments.delete(command.i); } break; case PeerCommandType.CancelSegmentRequest: - this.cancelDataUploading(); + this.peerInterface.stopUploadingSegmentData(); break; } - } + }; - protected receiveSegmentChunk(chunk: Uint8Array): void { - if (!this.requestContext) return; - const { request, controls } = this.requestContext; + protected onSegmentChunkReceived = (chunk: Uint8Array) => { + if (!this.downloadingContext?.isSegmentDataCommandReceived) return; + const { request, controls } = this.downloadingContext; controls.addLoadedChunk(chunk); + if (request.totalBytes === undefined) return; if (request.loadedBytes === request.totalBytes) { controls.completeOnSuccess(); - this.requestContext = undefined; - } else if ( - request.totalBytes !== undefined && - request.loadedBytes > request.totalBytes - ) { + this.downloadingContext = undefined; + } else if (request.loadedBytes > request.totalBytes) { this.cancelSegmentDownloading("peer-response-bytes-mismatch"); } - } + }; downloadSegment(segmentRequest: Request) { - if (this.requestContext) { - throw new Error("Segment already is downloading"); + if (this.downloadingContext) { + throw new Error("Some segment already is downloading"); } - this.requestContext = { + this.downloadingContext = { request: segmentRequest, + isSegmentDataCommandReceived: false, controls: segmentRequest.start( { type: "p2p", peerId: this.id }, { @@ -115,14 +131,14 @@ export class Peer extends PeerBase { i: segmentRequest.segment.externalId, }; if (segmentRequest.loadedBytes) command.b = segmentRequest.loadedBytes; - this.sendCommand(command); + this.peerInterface.sendCommand(command); } private abortSegmentDownloading = () => { - if (!this.requestContext) return; - const { request } = this.requestContext; + if (!this.downloadingContext) return; + const { request } = this.downloadingContext; this.sendCancelSegmentRequestCommand(request.segment); - this.requestContext = undefined; + this.downloadingContext = undefined; }; async uploadSegmentData(segmentExternalId: number, data: ArrayBuffer) { @@ -132,9 +148,11 @@ export class Peer extends PeerBase { i: segmentExternalId, s: data.byteLength, }; - this.sendCommand(command); + this.peerInterface.sendCommand(command); try { - await this.splitDataToChunksAndUploadAsync(data as Uint8Array); + await this.peerInterface.splitSegmentDataToChunksAndUploadAsync( + data as Uint8Array + ); this.logger(`segment ${segmentExternalId} has been sent to ${this.id}`); } catch (err) { this.logger(`cancel segment uploading ${segmentExternalId}`); @@ -142,16 +160,16 @@ export class Peer extends PeerBase { } private cancelSegmentDownloading(type: PeerRequestErrorType) { - if (!this.requestContext) return; - const { request, controls } = this.requestContext; - const { segment } = request; - this.logger(`cancel segment request ${segment.externalId} (${type})`); - const error = new RequestError(type); + if (!this.downloadingContext) return; + const { request, controls } = this.downloadingContext; if (type === "peer-response-bytes-mismatch") { this.sendCancelSegmentRequestCommand(request.segment); } + const { segment } = request; + this.logger(`cancel segment request ${segment.externalId} (${type})`); + const error = new RequestError(type); controls.abortOnError(error); - this.requestContext = undefined; + this.downloadingContext = undefined; } sendSegmentsAnnouncementCommand( @@ -163,26 +181,30 @@ export class Peer extends PeerBase { p: httpLoadingSegmentsIds, l: loadedSegmentsIds, }; - this.sendCommand(command); + this.peerInterface.sendCommand(command); } sendSegmentAbsentCommand(segmentExternalId: number) { - this.sendCommand({ + this.peerInterface.sendCommand({ c: PeerCommandType.SegmentAbsent, i: segmentExternalId, }); } private sendCancelSegmentRequestCommand(segment: Segment) { - this.sendCommand({ + this.peerInterface.sendCommand({ c: PeerCommandType.CancelSegmentRequest, i: segment.externalId, }); } - destroy() { - super.destroy(); + destroy = () => { + this.peerInterface.destroy(); this.cancelSegmentDownloading("peer-closed"); this.eventHandlers.onPeerClosed(this); + }; + + static getPeerIdFromConnection(connection: PeerConnection) { + return Utils.hexToUtf8(connection.id); } } diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts index a36cbef9..a63f7eae 100644 --- a/packages/p2p-media-loader-core/src/request.ts +++ b/packages/p2p-media-loader-core/src/request.ts @@ -71,13 +71,24 @@ export class Request { private readonly playback: Playback, private readonly settings: StreamUtils.PlaybackTimeWindowsSettings ) { - this.id = Request.getRequestItemId(segment); + this.id = Request.getRequestItemId(this.segment); + const { byteRange } = this.segment; + if (byteRange) { + const { end, start } = byteRange; + this._totalBytes = end - start + 1; + } this.notReceivingBytesTimeout = new Timeout(this.abortOnTimeout); const { type } = this.segment.stream; this._logger = debug(`core:request-${type}`); } + clearLoadedBytes() { + this._loadedBytes = 0; + this.bytes = []; + this._totalBytes = undefined; + } + get status() { return this._status; } @@ -136,10 +147,14 @@ export class Request { } ): RequestControls { if (this._status === "succeed") { - throw new Error("Request has been already succeed."); + throw new Error( + `Request ${this.segment.externalId} has been already succeed.` + ); } if (this._status === "loading") { - throw new Error("Request has been already started."); + throw new Error( + `Request ${this.segment.externalId} has been already started.` + ); } this._status = "loading"; diff --git a/packages/p2p-media-loader-shaka/src/stream-utils.ts b/packages/p2p-media-loader-shaka/src/stream-utils.ts index a0a955a1..3b45b099 100644 --- a/packages/p2p-media-loader-shaka/src/stream-utils.ts +++ b/packages/p2p-media-loader-shaka/src/stream-utils.ts @@ -19,6 +19,7 @@ export function createSegment({ return { localId: localId ?? getSegmentLocalId(url, byteRange), externalId, + byteRange, url, startTime, endTime,