From f26233c1c02d9af0f980fad4f7483274f50d6f66 Mon Sep 17 00:00:00 2001 From: Igor Zolotarenko Date: Fri, 29 Dec 2023 13:43:48 +0200 Subject: [PATCH] Add SegmentDataSendingCompleted command. --- .../src/hybrid-loader.ts | 1 - .../src/p2p/commands/commands.ts | 1 + .../src/p2p/commands/types.ts | 5 ++- .../p2p-media-loader-core/src/p2p/loader.ts | 2 +- .../p2p-media-loader-core/src/p2p/peer.ts | 35 ++++++++++++++++--- 5 files changed, 36 insertions(+), 8 deletions(-) diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index dfe688a4..9b9841a6 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -191,7 +191,6 @@ export class HybridLoader { } private processQueue() { - console.log("process queue"); const { queue, queueSegmentIds } = QueueUtils.generateQueue({ lastRequestedSegment: this.lastRequestedSegment, playback: this.playback, diff --git a/packages/p2p-media-loader-core/src/p2p/commands/commands.ts b/packages/p2p-media-loader-core/src/p2p/commands/commands.ts index 30ddeca6..d5cdd6f9 100644 --- a/packages/p2p-media-loader-core/src/p2p/commands/commands.ts +++ b/packages/p2p-media-loader-core/src/p2p/commands/commands.ts @@ -61,6 +61,7 @@ export function serializePeerCommand( switch (command.c) { case PeerCommandType.CancelSegmentRequest: case PeerCommandType.SegmentAbsent: + case PeerCommandType.SegmentDataSendingCompleted: return serializePeerSegmentCommand(command, maxChunkSize); case PeerCommandType.SegmentRequest: return serializePeerSegmentRequestCommand(command, maxChunkSize); diff --git a/packages/p2p-media-loader-core/src/p2p/commands/types.ts b/packages/p2p-media-loader-core/src/p2p/commands/types.ts index 27e9b144..cbe2aba3 100644 --- a/packages/p2p-media-loader-core/src/p2p/commands/types.ts +++ b/packages/p2p-media-loader-core/src/p2p/commands/types.ts @@ -6,12 +6,15 @@ export enum PeerCommandType { SegmentsAnnouncement, SegmentRequest, SegmentData, + SegmentDataSendingCompleted, SegmentAbsent, CancelSegmentRequest, } export type PeerSegmentCommand = BasePeerCommand< - PeerCommandType.SegmentAbsent | PeerCommandType.CancelSegmentRequest + | PeerCommandType.SegmentAbsent + | PeerCommandType.CancelSegmentRequest + | PeerCommandType.SegmentDataSendingCompleted > & { i: number; // segment id }; diff --git a/packages/p2p-media-loader-core/src/p2p/loader.ts b/packages/p2p-media-loader-core/src/p2p/loader.ts index cfda5d49..c276ba8b 100644 --- a/packages/p2p-media-loader-core/src/p2p/loader.ts +++ b/packages/p2p-media-loader-core/src/p2p/loader.ts @@ -120,7 +120,7 @@ export class P2PLoader { return; } void peer.uploadSegmentData( - segmentExternalId, + segment, byteFrom !== undefined ? segmentData.slice(byteFrom) : segmentData ); }; diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index 7d8e3c0c..ab98e4c9 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -81,6 +81,22 @@ export class Peer { controls.firstBytesReceived(); if (request.totalBytes === undefined) { request.setTotalBytes(command.s); + } else if (request.totalBytes - request.loadedBytes !== command.s) { + this.cancelSegmentDownloading("peer-response-bytes-mismatch"); + request.clearLoadedBytes(); + } + } + break; + + case PeerCommandType.SegmentDataSendingCompleted: + if (this.downloadingContext?.request.segment.externalId === command.i) { + const { request, controls } = this.downloadingContext; + if (request.loadedBytes !== request.totalBytes) { + request.clearLoadedBytes(); + this.cancelSegmentDownloading("peer-response-bytes-mismatch"); + } else { + controls.completeOnSuccess(); + this.downloadingContext = undefined; } } break; @@ -151,11 +167,12 @@ export class Peer { this.peerProtocol.sendCommand(command); } - async uploadSegmentData(segmentExternalId: number, data: ArrayBuffer) { - this.logger(`send segment ${segmentExternalId} to ${this.id}`); + async uploadSegmentData(segment: Segment, data: ArrayBuffer) { + const { externalId } = segment; + this.logger(`send segment ${segment.externalId} to ${this.id}`); const command: Command.PeerSendSegmentCommand = { c: PeerCommandType.SegmentData, - i: segmentExternalId, + i: externalId, s: data.byteLength, }; this.peerProtocol.sendCommand(command); @@ -163,9 +180,10 @@ export class Peer { await this.peerProtocol.splitSegmentDataToChunksAndUploadAsync( data as Uint8Array ); - this.logger(`segment ${segmentExternalId} has been sent to ${this.id}`); + this.sendSegmentDataSendingCompletedCommand(segment); + this.logger(`segment ${externalId} has been sent to ${this.id}`); } catch (err) { - this.logger(`cancel segment uploading ${segmentExternalId}`); + this.logger(`cancel segment uploading ${externalId}`); } } @@ -209,6 +227,13 @@ export class Peer { }); } + private sendSegmentDataSendingCompletedCommand(segment: Segment) { + this.peerProtocol.sendCommand({ + c: PeerCommandType.SegmentDataSendingCompleted, + i: segment.externalId, + }); + } + destroy = () => { this.cancelSegmentDownloading("peer-closed"); this.eventHandlers.onPeerClosed(this);