diff --git a/shared/packages/expectationManager/package.json b/shared/packages/expectationManager/package.json index 66cb3cb9..115bb431 100644 --- a/shared/packages/expectationManager/package.json +++ b/shared/packages/expectationManager/package.json @@ -12,6 +12,9 @@ "engines": { "node": ">=14.18.0" }, + "devDependencies": { + "type-fest": "3.13.1" + }, "dependencies": { "@sofie-package-manager/api": "1.50.0", "@sofie-package-manager/worker": "1.50.0", diff --git a/shared/packages/expectationManager/src/expectationTracker/lib/__tests__/workerScaler.spec.ts b/shared/packages/expectationManager/src/expectationTracker/lib/__tests__/workerScaler.spec.ts new file mode 100644 index 00000000..5004dbfd --- /dev/null +++ b/shared/packages/expectationManager/src/expectationTracker/lib/__tests__/workerScaler.spec.ts @@ -0,0 +1,210 @@ +import { PartialDeep } from 'type-fest' +import { + ExpectationId, + LogLevel, + ProcessConfig, + WorkerAgentId, + initializeLogger, + literal, + protectString, + setupLogger, +} from '@sofie-package-manager/api' +import { WorkerScaler } from '../workerScaler' +import { InternalManager } from '../../../internalManager/internalManager' +import { TrackedWorkerAgent } from '../../../internalManager/lib/trackedWorkerAgents' +import { ExpectationTracker } from '../../expectationTracker' +import { TrackedExpectation } from '../../../lib/trackedExpectation' +import { ExpectedPackageStatusAPI } from '@sofie-automation/shared-lib/dist/package-manager/package' + +// --------------------------------------------------------- +const SCALE_UP_COUNT = 1 +const SCALE_UP_TIME = 10 +let isThereExistingWorkers = false +// --------------------------------------------------------- + +const logLevel = LogLevel.WARN +const config = { + process: literal({ + logPath: undefined, + logLevel: undefined, + unsafeSSL: false, + certificates: [], + }), +} +initializeLogger(config) +const logger = setupLogger(config, '', undefined, undefined, logLevel) +logger.warn = jest.fn(logger.warn) as any +logger.error = jest.fn(logger.error) as any + +const requestResourcesForExpectation = jest.fn(async () => false) + +const fakeManager = literal>({ + workforceConnection: { + workforceAPI: { + requestResourcesForExpectation, + }, + }, + workerAgents: { + list: (): { workerId: WorkerAgentId; workerAgent: TrackedWorkerAgent }[] => { + if (isThereExistingWorkers) + return [ + { + workerId: protectString('worker0'), + workerAgent: {} as any as TrackedWorkerAgent, + }, + ] + else return [] + }, + }, +}) as any as InternalManager + +const fakeTracker = literal>({ + constants: { + SCALE_UP_COUNT, + SCALE_UP_TIME, + }, + trackedExpectations: { + list: (): TrackedExpectation[] => { + return expectations + }, + }, + trackedExpectationAPI: { + isExpectationWaitingForOther: (_exp): TrackedExpectation | null => { + return null + }, + }, + getTrackedPackageContainers: () => { + return [] + }, +}) as any as ExpectationTracker +let expectations: TrackedExpectation[] = [] +function setExpectations( + from: { + id: string + state: ExpectedPackageStatusAPI.WorkStatusState + hasAvailableWorkers: boolean + noWorkerAssignedTime?: number + }[] +) { + expectations = Array.from(from).map((e): TrackedExpectation => { + return literal>({ + id: protectString(e.id), + state: e.state, + noWorkerAssignedTime: e.noWorkerAssignedTime ?? null, + availableWorkers: new Set(e.hasAvailableWorkers ? [protectString('worker0')] : []), + + exp: { + statusReport: { + label: `mock${e.id}`, + }, + }, + }) as any as TrackedExpectation + }) +} + +beforeEach(() => { + isThereExistingWorkers = false + expectations = [] + + requestResourcesForExpectation.mockClear() +}) +afterEach(() => { + expect(logger.warn).toHaveBeenCalledTimes(0) + expect(logger.error).toHaveBeenCalledTimes(0) +}) + +test('no expectations', async () => { + const scaler = new WorkerScaler(logger, fakeManager, fakeTracker) + await scaler.checkIfNeedToScaleUp() + + expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(0) +}) +test('1 fulfilled expectation', async () => { + const scaler = new WorkerScaler(logger, fakeManager, fakeTracker) + + isThereExistingWorkers = false + setExpectations([ + { + id: 'exp0', + state: ExpectedPackageStatusAPI.WorkStatusState.FULFILLED, + hasAvailableWorkers: true, + }, + ]) + + await scaler.checkIfNeedToScaleUp() + expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(0) +}) + +test('1 waiting expectation, no workers', async () => { + const scaler = new WorkerScaler(logger, fakeManager, fakeTracker) + + isThereExistingWorkers = false + setExpectations([ + { + id: 'exp0', + state: ExpectedPackageStatusAPI.WorkStatusState.WAITING, + hasAvailableWorkers: false, + }, + ]) + + await scaler.checkIfNeedToScaleUp() + expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1) +}) + +test('1 waiting expectation', async () => { + const scaler = new WorkerScaler(logger, fakeManager, fakeTracker) + + isThereExistingWorkers = true + setExpectations([ + { + id: 'exp0', + state: ExpectedPackageStatusAPI.WorkStatusState.WAITING, + hasAvailableWorkers: true, + }, + ]) + + await scaler.checkIfNeedToScaleUp() + await sleep(SCALE_UP_TIME * 2) + await scaler.checkIfNeedToScaleUp() + expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1) +}) +test('1 expectation, not assigned to worker', async () => { + const scaler = new WorkerScaler(logger, fakeManager, fakeTracker) + + isThereExistingWorkers = true + setExpectations([ + { + id: 'exp0', + state: ExpectedPackageStatusAPI.WorkStatusState.FULFILLED, + noWorkerAssignedTime: Date.now() - 1000, + hasAvailableWorkers: true, + }, + ]) + + await scaler.checkIfNeedToScaleUp() + await sleep(SCALE_UP_TIME * 2) + await scaler.checkIfNeedToScaleUp() + expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1) +}) +test('1 expectation, no available workers', async () => { + const scaler = new WorkerScaler(logger, fakeManager, fakeTracker) + + isThereExistingWorkers = true + setExpectations([ + { + id: 'exp0', + state: ExpectedPackageStatusAPI.WorkStatusState.NEW, + // noWorkerAssignedTime: Date.now() - 1000, + hasAvailableWorkers: false, + }, + ]) + + await scaler.checkIfNeedToScaleUp() + await sleep(SCALE_UP_TIME * 2) + await scaler.checkIfNeedToScaleUp() + expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1) +}) + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)) +} diff --git a/shared/packages/expectationManager/src/expectationTracker/lib/workInProgressTracker.ts b/shared/packages/expectationManager/src/expectationTracker/lib/workInProgressTracker.ts index 6b813867..597b8b3a 100644 --- a/shared/packages/expectationManager/src/expectationTracker/lib/workInProgressTracker.ts +++ b/shared/packages/expectationManager/src/expectationTracker/lib/workInProgressTracker.ts @@ -121,11 +121,8 @@ export class WorkInProgressTracker { }, }) - if (this.tracker.trackedExpectationAPI.onExpectationFulfilled(wip.trackedExp)) { - // Something was triggered, run again asap. - // We should reevaluate asap, so that any other expectation which might be waiting on this work could start. - this.tracker.triggerEvaluationNow() - } + // Trigger another evaluation ASAP, since a worker is free now, another expectation might be able to start: + this.tracker.triggerEvaluationNow() } else { // Expectation not in WORKING state, ignore } diff --git a/shared/packages/expectationManager/src/expectationTracker/lib/workerScaler.ts b/shared/packages/expectationManager/src/expectationTracker/lib/workerScaler.ts index ff26c549..a5649dc5 100644 --- a/shared/packages/expectationManager/src/expectationTracker/lib/workerScaler.ts +++ b/shared/packages/expectationManager/src/expectationTracker/lib/workerScaler.ts @@ -57,6 +57,9 @@ export class WorkerScaler { this.waitingExpectations = [] for (const exp of this.tracker.trackedExpectations.list()) { + /** The expectation is waiting on another expectation */ + const isWaitingForOther = this.tracker.trackedExpectationAPI.isExpectationWaitingForOther(exp) + /** The expectation is waiting for a worker */ const isWaiting: boolean = exp.state === ExpectedPackageStatusAPI.WorkStatusState.NEW || @@ -64,17 +67,16 @@ export class WorkerScaler { exp.state === ExpectedPackageStatusAPI.WorkStatusState.READY /** Not supported by any worker */ - const notSupportedByAnyWorker: boolean = exp.availableWorkers.size === 0 + const notSupportedByAnyWorker = exp.availableWorkers.size === 0 + /** No worker has had time to work on it lately */ - const notAssignedToAnyWorker: boolean = + const notAssignedToAnyWorkerForSomeTime: boolean = !!exp.noWorkerAssignedTime && Date.now() - exp.noWorkerAssignedTime > this.tracker.constants.SCALE_UP_TIME - if ( - isWaiting && - (notSupportedByAnyWorker || notAssignedToAnyWorker) && - !this.tracker.trackedExpectationAPI.isExpectationWaitingForOther(exp) // Filter out expectations that aren't ready to begin working on anyway - ) { + // Is the expectation waiting for resources? + if (!isWaitingForOther && (isWaiting || notSupportedByAnyWorker || notAssignedToAnyWorkerForSomeTime)) { + // Add a second round of waiting, to ensure that we don't scale up prematurely: if (!exp.waitingForWorkerTime) { this.logger.silly( `Starting to track how long expectation "${expLabel(exp)}" has been waiting for a worker` @@ -84,6 +86,8 @@ export class WorkerScaler { } else { exp.waitingForWorkerTime = null } + + // If the expectation has been waiting for long enough: if (exp.waitingForWorkerTime) { const hasBeenWaitingFor = Date.now() - exp.waitingForWorkerTime if (