Skip to content

Commit

Permalink
Add SegmentDataSendingCompleted command.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Dec 29, 2023
1 parent e65b6e5 commit f26233c
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 8 deletions.
1 change: 0 additions & 1 deletion packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ export class HybridLoader {
}

private processQueue() {
console.log("process queue");
const { queue, queueSegmentIds } = QueueUtils.generateQueue({
lastRequestedSegment: this.lastRequestedSegment,
playback: this.playback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export function serializePeerCommand(
switch (command.c) {
case PeerCommandType.CancelSegmentRequest:
case PeerCommandType.SegmentAbsent:
case PeerCommandType.SegmentDataSendingCompleted:
return serializePeerSegmentCommand(command, maxChunkSize);
case PeerCommandType.SegmentRequest:
return serializePeerSegmentRequestCommand(command, maxChunkSize);
Expand Down
5 changes: 4 additions & 1 deletion packages/p2p-media-loader-core/src/p2p/commands/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ export enum PeerCommandType {
SegmentsAnnouncement,
SegmentRequest,
SegmentData,
SegmentDataSendingCompleted,
SegmentAbsent,
CancelSegmentRequest,
}

export type PeerSegmentCommand = BasePeerCommand<
PeerCommandType.SegmentAbsent | PeerCommandType.CancelSegmentRequest
| PeerCommandType.SegmentAbsent
| PeerCommandType.CancelSegmentRequest
| PeerCommandType.SegmentDataSendingCompleted
> & {
i: number; // segment id
};
Expand Down
2 changes: 1 addition & 1 deletion packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ export class P2PLoader {
return;
}
void peer.uploadSegmentData(
segmentExternalId,
segment,
byteFrom !== undefined ? segmentData.slice(byteFrom) : segmentData
);
};
Expand Down
35 changes: 30 additions & 5 deletions packages/p2p-media-loader-core/src/p2p/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ export class Peer {
controls.firstBytesReceived();
if (request.totalBytes === undefined) {
request.setTotalBytes(command.s);
} else if (request.totalBytes - request.loadedBytes !== command.s) {
this.cancelSegmentDownloading("peer-response-bytes-mismatch");
request.clearLoadedBytes();
}
}
break;

case PeerCommandType.SegmentDataSendingCompleted:
if (this.downloadingContext?.request.segment.externalId === command.i) {
const { request, controls } = this.downloadingContext;
if (request.loadedBytes !== request.totalBytes) {
request.clearLoadedBytes();
this.cancelSegmentDownloading("peer-response-bytes-mismatch");
} else {
controls.completeOnSuccess();
this.downloadingContext = undefined;
}
}
break;
Expand Down Expand Up @@ -151,21 +167,23 @@ export class Peer {
this.peerProtocol.sendCommand(command);
}

async uploadSegmentData(segmentExternalId: number, data: ArrayBuffer) {
this.logger(`send segment ${segmentExternalId} to ${this.id}`);
async uploadSegmentData(segment: Segment, data: ArrayBuffer) {
const { externalId } = segment;
this.logger(`send segment ${segment.externalId} to ${this.id}`);
const command: Command.PeerSendSegmentCommand = {
c: PeerCommandType.SegmentData,
i: segmentExternalId,
i: externalId,
s: data.byteLength,
};
this.peerProtocol.sendCommand(command);
try {
await this.peerProtocol.splitSegmentDataToChunksAndUploadAsync(
data as Uint8Array
);
this.logger(`segment ${segmentExternalId} has been sent to ${this.id}`);
this.sendSegmentDataSendingCompletedCommand(segment);
this.logger(`segment ${externalId} has been sent to ${this.id}`);
} catch (err) {
this.logger(`cancel segment uploading ${segmentExternalId}`);
this.logger(`cancel segment uploading ${externalId}`);
}
}

Expand Down Expand Up @@ -209,6 +227,13 @@ export class Peer {
});
}

private sendSegmentDataSendingCompletedCommand(segment: Segment) {
this.peerProtocol.sendCommand({
c: PeerCommandType.SegmentDataSendingCompleted,
i: segment.externalId,
});
}

destroy = () => {
this.cancelSegmentDownloading("peer-closed");
this.eventHandlers.onPeerClosed(this);
Expand Down

0 comments on commit f26233c

Please sign in to comment.