Skip to content

Commit

Permalink
Merge with v1.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Jan 3, 2024
2 parents 099fb48 + f9382a6 commit 9d679e5
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 106 deletions.
2 changes: 1 addition & 1 deletion p2p-media-loader-demo/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<script type="text/javascript"
src="https://cdn.jsdelivr.net/gh/clappr/clappr-level-selector-plugin@latest/dist/level-selector.min.js"></script>
<script type="text/javascript"
src="https://cdn.jsdelivr.net/npm/shaka-player@~4.6.0/dist/shaka-player.compiled.min.js"></script>
src="https://cdn.jsdelivr.net/npm/shaka-player@~4.4.0/dist/shaka-player.compiled.min.js"></script>
<script type="text/javascript"
src="https://cdn.jsdelivr.net/gh/clappr/dash-shaka-playback@latest/dist/dash-shaka-playback.external.js"></script>
</head>
Expand Down
2 changes: 1 addition & 1 deletion packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import * as StreamUtils from "./utils/stream";
import { LinkedMap } from "./linked-map";
import { BandwidthCalculator } from "./bandwidth-calculator";
import { EngineCallbacks } from "./request";
import { EngineCallbacks } from "./requests/engine-request";
import { SegmentsMemoryStorage } from "./segments-storage";

export class Core<TStream extends Stream = Stream> {
Expand Down
2 changes: 1 addition & 1 deletion packages/p2p-media-loader-core/src/http-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
RequestError,
HttpRequestErrorType,
RequestControls,
} from "./request";
} from "./requests/request";

type HttpSettings = Pick<Settings, "httpNotReceivingBytesTimeoutMs">;

Expand Down
89 changes: 52 additions & 37 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { SegmentsMemoryStorage } from "./segments-storage";
import { Settings, CoreEventHandlers, Playback } from "./types";
import { BandwidthCalculator } from "./bandwidth-calculator";
import { P2PLoadersContainer } from "./p2p/loaders-container";
import { RequestsContainer } from "./request-container";
import { EngineCallbacks } from "./request";
import { RequestsContainer } from "./requests/request-container";
import { EngineRequest, EngineCallbacks } from "./requests/engine-request";
import * as QueueUtils from "./utils/queue";
import * as LoggerUtils from "./utils/logger";
import * as StreamUtils from "./utils/stream";
Expand All @@ -25,6 +25,7 @@ export class HybridLoader {
private randomHttpDownloadInterval!: number;
private readonly logger: debug.Debugger;
private isProcessQueueMicrotaskCreated = false;
private readonly engineRequests = new Map<Segment, EngineRequest>();

constructor(
private streamManifestUrl: string,
Expand Down Expand Up @@ -88,19 +89,18 @@ export class HybridLoader {
}
this.lastRequestedSegment = segment;

const engineRequest = new EngineRequest(segment, callbacks);
if (this.segmentStorage.hasSegment(segment)) {
// TODO: error handling
const data = await this.segmentStorage.getSegmentData(segment);
if (data) {
callbacks.onSuccess({
engineRequest.resolve(
data,
bandwidth:
this.bandwidthCalculator.getBandwidthForLastNSplicedSeconds(3),
});
this.bandwidthCalculator.getBandwidthForLastNSplicedSeconds(3),
);
}
} else {
const request = this.requests.getOrCreateRequest(segment);
request.setEngineCallbacks(callbacks);
this.engineRequests.set(segment, engineRequest);
}
this.requestProcessQueueMicrotask();
}
Expand Down Expand Up @@ -132,53 +132,45 @@ export class HybridLoader {
const { httpErrorRetries } = this.settings;
const now = performance.now();
for (const request of this.requests.items()) {
const {
type,
status,
segment,
isHandledByProcessQueue,
isSegmentRequestedByEngine,
} = request;

if (!type) continue;
const { type, status, segment, isHandledByProcessQueue } = request;
const engineRequest = this.engineRequests.get(segment);

switch (status) {
case "loading":
if (
!isSegmentRequestedByEngine &&
!queueSegmentIds.has(segment.localId)
) {
if (!queueSegmentIds.has(segment.localId) && !engineRequest) {
request.abortFromProcessQueue();
this.requests.remove(request);
}
break;

case "succeed":
if (!request.data) break;
if (!request.data || !type) break;
if (type === "http") {
this.p2pLoaders.currentLoader.broadcastAnnouncement();
}
request.resolveEngineCallbacksSuccessfully();
engineRequest?.resolve(
request.data,
this.bandwidthCalculator.getBandwidthForLastNSeconds(3)
);
this.engineRequests.delete(segment);
this.requests.remove(request);
void this.segmentStorage.storeSegment(request.segment, request.data);
this.eventHandlers?.onSegmentLoaded?.(request.data.byteLength, type);
this.requests.remove(request);
break;

case "failed":
if (type === "http" && !isHandledByProcessQueue) {
this.p2pLoaders.currentLoader.broadcastAnnouncement();
}
if (
!isSegmentRequestedByEngine &&
!stream.segments.has(request.segment.localId)
) {
if (!engineRequest && !stream.segments.has(request.segment.localId)) {
this.requests.remove(request);
}
if (
request.failedAttempts.httpAttemptsCount >= httpErrorRetries &&
isSegmentRequestedByEngine
engineRequest
) {
request.resolveEngineCallbacksWithError();
engineRequest.reject();
this.engineRequests.delete(segment);
}
break;

Expand All @@ -190,8 +182,8 @@ export class HybridLoader {
this.requests.remove(request);
break;
}
request.markHandledByProcessQueue();

request.markHandledByProcessQueue();
const { lastAttempt } = request.failedAttempts;
if (
lastAttempt &&
Expand Down Expand Up @@ -222,6 +214,22 @@ export class HybridLoader {
httpErrorRetries,
} = this.settings;

for (const engineRequest of this.engineRequests.values()) {
if (this.requests.executingHttpCount >= simultaneousHttpDownloads) break;
const request = this.requests.get(engineRequest.segment);
if (
!queueSegmentIds.has(engineRequest.segment.localId) &&
engineRequest.status === "pending" &&
(!request ||
request.status === "not-started" ||
(request.status === "failed" &&
request.failedAttempts.httpAttemptsCount <
this.settings.httpErrorRetries))
) {
void this.loadThroughHttp(engineRequest.segment);
}
}

for (const item of queue) {
const { statuses, segment } = item;
const request = this.requests.get(segment);
Expand All @@ -240,7 +248,7 @@ export class HybridLoader {
request?.status === "loading" && request.type === "p2p";

if (this.requests.executingHttpCount < simultaneousHttpDownloads) {
if (isP2PLoadingRequest) request.abortFromEngine();
if (isP2PLoadingRequest) request.abortFromProcessQueue();
void this.loadThroughHttp(segment);
continue;
}
Expand All @@ -249,7 +257,7 @@ export class HybridLoader {
this.abortLastHttpLoadingInQueueAfterItem(queue, segment) &&
this.requests.executingHttpCount < simultaneousHttpDownloads
) {
if (isP2PLoadingRequest) request.abortFromEngine();
if (isP2PLoadingRequest) request.abortFromProcessQueue();
void this.loadThroughHttp(segment);
continue;
}
Expand Down Expand Up @@ -289,10 +297,17 @@ export class HybridLoader {

// api method for engines
abortSegmentRequest(segmentLocalId: string) {
const request = this.requests.getBySegmentLocalId(segmentLocalId);
if (!request) return;
request.abortFromEngine();
this.logger("abort: ", LoggerUtils.getSegmentString(request.segment));
for (const engineRequest of this.engineRequests.values()) {
if (segmentLocalId === engineRequest.segment.localId) {
engineRequest.abort();
this.engineRequests.delete(engineRequest.segment);
this.logger(
"abort: ",
LoggerUtils.getSegmentString(engineRequest.segment)
);
break;
}
}
}

private async loadThroughHttp(segment: Segment) {
Expand Down
4 changes: 2 additions & 2 deletions packages/p2p-media-loader-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

export { Core } from "./core";
export type * from "./types";
export { CoreRequestError } from "./request";
export type { EngineCallbacks } from "./request";
export { CoreRequestError } from "./requests/engine-request";
export type { EngineCallbacks } from "./requests/engine-request";
4 changes: 2 additions & 2 deletions packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Peer } from "./peer";
import { Segment, Settings, StreamWithSegments } from "../types";
import { SegmentsMemoryStorage } from "../segments-storage";
import { RequestsContainer } from "../request-container";
import { Request } from "../request";
import { RequestsContainer } from "../requests/request-container";
import { Request } from "../requests/request";
import { P2PTrackerClient } from "./tracker-client";
import * as StreamUtils from "../utils/stream";
import * as Utils from "../utils/utils";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { P2PLoader } from "./loader";
import debug from "debug";
import { Settings, Stream, StreamWithSegments } from "../index";
import { RequestsContainer } from "../request-container";
import { RequestsContainer } from "../requests/request-container";
import { SegmentsMemoryStorage } from "../segments-storage";
import * as LoggerUtils from "../utils/logger";

Expand Down
2 changes: 1 addition & 1 deletion packages/p2p-media-loader-core/src/p2p/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
RequestError,
PeerRequestErrorType,
RequestInnerErrorType,
} from "../request";
} from "../requests/request";
import * as Command from "./commands";
import { Segment } from "../types";
import * as Utils from "../utils/utils";
Expand Down
49 changes: 49 additions & 0 deletions packages/p2p-media-loader-core/src/requests/engine-request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { Segment, SegmentResponse } from "../types";

export type EngineCallbacks = {
onSuccess: (response: SegmentResponse) => void;
onError: (reason: CoreRequestError) => void;
};

export class EngineRequest {
private _status: "pending" | "succeed" | "failed" | "aborted" = "pending";

constructor(
readonly segment: Segment,
readonly engineCallbacks: EngineCallbacks
) {}

get status() {
return this._status;
}

resolve(data: ArrayBuffer, bandwidth: number) {
this.throwErrorIfNotPending();
this._status = "succeed";
this.engineCallbacks.onSuccess({ data, bandwidth });
}

reject() {
this.throwErrorIfNotPending();
this._status = "failed";
this.engineCallbacks.onError(new CoreRequestError("failed"));
}

abort() {
this.throwErrorIfNotPending();
this._status = "aborted";
this.engineCallbacks.onError(new CoreRequestError("aborted"));
}

private throwErrorIfNotPending() {
if (this._status !== "pending") {
throw new Error("Engine request has been already settled.");
}
}
}

export class CoreRequestError extends Error {
constructor(readonly type: "failed" | "aborted") {
super();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Segment, Settings, Playback } from "./types";
import { BandwidthCalculator } from "./bandwidth-calculator";
import { Segment, Settings, Playback } from "../types";
import { BandwidthCalculator } from "../bandwidth-calculator";
import { Request } from "./request";

export class RequestsContainer {
Expand Down Expand Up @@ -32,12 +32,6 @@ export class RequestsContainer {
return this.requests.get(segment);
}

getBySegmentLocalId(id: string) {
for (const request of this.requests.values()) {
if (request.segment.localId === id) return request;
}
}

getOrCreateRequest(segment: Segment) {
let request = this.requests.get(segment);
if (!request) {
Expand Down Expand Up @@ -76,7 +70,6 @@ export class RequestsContainer {
destroy() {
for (const request of this.requests.values()) {
request.abortFromProcessQueue();
request.abortFromEngine();
}
this.requests.clear();
}
Expand Down
Loading

0 comments on commit 9d679e5

Please sign in to comment.