diff --git a/shared/packages/api/src/logger.ts b/shared/packages/api/src/logger.ts index 6d4d64f8..be0cce48 100644 --- a/shared/packages/api/src/logger.ts +++ b/shared/packages/api/src/logger.ts @@ -101,7 +101,9 @@ export function setupLogger( }) if (initialLogLevel) setLogLevel(initialLogLevel, true) - logger.info('Logging to', logPath) + if (handleProcess) { + logger.info(`Logging to "${logPath}"`) + } } else { const transportConsole = new Winston.transports.Console({ level: logLevel, diff --git a/shared/packages/expectationManager/package.json b/shared/packages/expectationManager/package.json index 66cb3cb9..115bb431 100644 --- a/shared/packages/expectationManager/package.json +++ b/shared/packages/expectationManager/package.json @@ -12,6 +12,9 @@ "engines": { "node": ">=14.18.0" }, + "devDependencies": { + "type-fest": "3.13.1" + }, "dependencies": { "@sofie-package-manager/api": "1.50.0", "@sofie-package-manager/worker": "1.50.0", diff --git a/shared/packages/expectationManager/src/evaluationRunner/evaluateExpectationStates/ready.ts b/shared/packages/expectationManager/src/evaluationRunner/evaluateExpectationStates/ready.ts index 2c37d647..03cdb593 100644 --- a/shared/packages/expectationManager/src/evaluationRunner/evaluateExpectationStates/ready.ts +++ b/shared/packages/expectationManager/src/evaluationRunner/evaluateExpectationStates/ready.ts @@ -83,7 +83,7 @@ export async function evaluateExpectationStateReady({ } } else { // No worker is available at the moment. - // Check if anough time has passed if it makes sense to check for new workers again: + // Check if enough time has passed if it makes sense to check for new workers again: if ( trackedExp.noWorkerAssignedTime && diff --git a/shared/packages/expectationManager/src/expectationTracker/lib/__tests__/workerScaler.spec.ts b/shared/packages/expectationManager/src/expectationTracker/lib/__tests__/workerScaler.spec.ts new file mode 100644 index 00000000..5004dbfd --- /dev/null +++ b/shared/packages/expectationManager/src/expectationTracker/lib/__tests__/workerScaler.spec.ts @@ -0,0 +1,210 @@ +import { PartialDeep } from 'type-fest' +import { + ExpectationId, + LogLevel, + ProcessConfig, + WorkerAgentId, + initializeLogger, + literal, + protectString, + setupLogger, +} from '@sofie-package-manager/api' +import { WorkerScaler } from '../workerScaler' +import { InternalManager } from '../../../internalManager/internalManager' +import { TrackedWorkerAgent } from '../../../internalManager/lib/trackedWorkerAgents' +import { ExpectationTracker } from '../../expectationTracker' +import { TrackedExpectation } from '../../../lib/trackedExpectation' +import { ExpectedPackageStatusAPI } from '@sofie-automation/shared-lib/dist/package-manager/package' + +// --------------------------------------------------------- +const SCALE_UP_COUNT = 1 +const SCALE_UP_TIME = 10 +let isThereExistingWorkers = false +// --------------------------------------------------------- + +const logLevel = LogLevel.WARN +const config = { + process: literal({ + logPath: undefined, + logLevel: undefined, + unsafeSSL: false, + certificates: [], + }), +} +initializeLogger(config) +const logger = setupLogger(config, '', undefined, undefined, logLevel) +logger.warn = jest.fn(logger.warn) as any +logger.error = jest.fn(logger.error) as any + +const requestResourcesForExpectation = jest.fn(async () => false) + +const fakeManager = literal>({ + workforceConnection: { + workforceAPI: { + requestResourcesForExpectation, + }, + }, + workerAgents: { + list: (): { workerId: WorkerAgentId; workerAgent: TrackedWorkerAgent }[] => { + if (isThereExistingWorkers) + return [ + { + workerId: protectString('worker0'), + workerAgent: {} as any as TrackedWorkerAgent, + }, + ] + else return [] + }, + }, +}) as any as InternalManager + +const fakeTracker = literal>({ + constants: { + SCALE_UP_COUNT, + SCALE_UP_TIME, + }, + trackedExpectations: { + list: (): TrackedExpectation[] => { + return expectations + }, + }, + trackedExpectationAPI: { + isExpectationWaitingForOther: (_exp): TrackedExpectation | null => { + return null + }, + }, + getTrackedPackageContainers: () => { + return [] + }, +}) as any as ExpectationTracker +let expectations: TrackedExpectation[] = [] +function setExpectations( + from: { + id: string + state: ExpectedPackageStatusAPI.WorkStatusState + hasAvailableWorkers: boolean + noWorkerAssignedTime?: number + }[] +) { + expectations = Array.from(from).map((e): TrackedExpectation => { + return literal>({ + id: protectString(e.id), + state: e.state, + noWorkerAssignedTime: e.noWorkerAssignedTime ?? null, + availableWorkers: new Set(e.hasAvailableWorkers ? [protectString('worker0')] : []), + + exp: { + statusReport: { + label: `mock${e.id}`, + }, + }, + }) as any as TrackedExpectation + }) +} + +beforeEach(() => { + isThereExistingWorkers = false + expectations = [] + + requestResourcesForExpectation.mockClear() +}) +afterEach(() => { + expect(logger.warn).toHaveBeenCalledTimes(0) + expect(logger.error).toHaveBeenCalledTimes(0) +}) + +test('no expectations', async () => { + const scaler = new WorkerScaler(logger, fakeManager, fakeTracker) + await scaler.checkIfNeedToScaleUp() + + expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(0) +}) +test('1 fulfilled expectation', async () => { + const scaler = new WorkerScaler(logger, fakeManager, fakeTracker) + + isThereExistingWorkers = false + setExpectations([ + { + id: 'exp0', + state: ExpectedPackageStatusAPI.WorkStatusState.FULFILLED, + hasAvailableWorkers: true, + }, + ]) + + await scaler.checkIfNeedToScaleUp() + expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(0) +}) + +test('1 waiting expectation, no workers', async () => { + const scaler = new WorkerScaler(logger, fakeManager, fakeTracker) + + isThereExistingWorkers = false + setExpectations([ + { + id: 'exp0', + state: ExpectedPackageStatusAPI.WorkStatusState.WAITING, + hasAvailableWorkers: false, + }, + ]) + + await scaler.checkIfNeedToScaleUp() + expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1) +}) + +test('1 waiting expectation', async () => { + const scaler = new WorkerScaler(logger, fakeManager, fakeTracker) + + isThereExistingWorkers = true + setExpectations([ + { + id: 'exp0', + state: ExpectedPackageStatusAPI.WorkStatusState.WAITING, + hasAvailableWorkers: true, + }, + ]) + + await scaler.checkIfNeedToScaleUp() + await sleep(SCALE_UP_TIME * 2) + await scaler.checkIfNeedToScaleUp() + expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1) +}) +test('1 expectation, not assigned to worker', async () => { + const scaler = new WorkerScaler(logger, fakeManager, fakeTracker) + + isThereExistingWorkers = true + setExpectations([ + { + id: 'exp0', + state: ExpectedPackageStatusAPI.WorkStatusState.FULFILLED, + noWorkerAssignedTime: Date.now() - 1000, + hasAvailableWorkers: true, + }, + ]) + + await scaler.checkIfNeedToScaleUp() + await sleep(SCALE_UP_TIME * 2) + await scaler.checkIfNeedToScaleUp() + expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1) +}) +test('1 expectation, no available workers', async () => { + const scaler = new WorkerScaler(logger, fakeManager, fakeTracker) + + isThereExistingWorkers = true + setExpectations([ + { + id: 'exp0', + state: ExpectedPackageStatusAPI.WorkStatusState.NEW, + // noWorkerAssignedTime: Date.now() - 1000, + hasAvailableWorkers: false, + }, + ]) + + await scaler.checkIfNeedToScaleUp() + await sleep(SCALE_UP_TIME * 2) + await scaler.checkIfNeedToScaleUp() + expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1) +}) + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)) +} diff --git a/shared/packages/expectationManager/src/expectationTracker/lib/workInProgressTracker.ts b/shared/packages/expectationManager/src/expectationTracker/lib/workInProgressTracker.ts index 6b813867..597b8b3a 100644 --- a/shared/packages/expectationManager/src/expectationTracker/lib/workInProgressTracker.ts +++ b/shared/packages/expectationManager/src/expectationTracker/lib/workInProgressTracker.ts @@ -121,11 +121,8 @@ export class WorkInProgressTracker { }, }) - if (this.tracker.trackedExpectationAPI.onExpectationFulfilled(wip.trackedExp)) { - // Something was triggered, run again asap. - // We should reevaluate asap, so that any other expectation which might be waiting on this work could start. - this.tracker.triggerEvaluationNow() - } + // Trigger another evaluation ASAP, since a worker is free now, another expectation might be able to start: + this.tracker.triggerEvaluationNow() } else { // Expectation not in WORKING state, ignore } diff --git a/shared/packages/expectationManager/src/expectationTracker/lib/workerScaler.ts b/shared/packages/expectationManager/src/expectationTracker/lib/workerScaler.ts index ff26c549..a5649dc5 100644 --- a/shared/packages/expectationManager/src/expectationTracker/lib/workerScaler.ts +++ b/shared/packages/expectationManager/src/expectationTracker/lib/workerScaler.ts @@ -57,6 +57,9 @@ export class WorkerScaler { this.waitingExpectations = [] for (const exp of this.tracker.trackedExpectations.list()) { + /** The expectation is waiting on another expectation */ + const isWaitingForOther = this.tracker.trackedExpectationAPI.isExpectationWaitingForOther(exp) + /** The expectation is waiting for a worker */ const isWaiting: boolean = exp.state === ExpectedPackageStatusAPI.WorkStatusState.NEW || @@ -64,17 +67,16 @@ export class WorkerScaler { exp.state === ExpectedPackageStatusAPI.WorkStatusState.READY /** Not supported by any worker */ - const notSupportedByAnyWorker: boolean = exp.availableWorkers.size === 0 + const notSupportedByAnyWorker = exp.availableWorkers.size === 0 + /** No worker has had time to work on it lately */ - const notAssignedToAnyWorker: boolean = + const notAssignedToAnyWorkerForSomeTime: boolean = !!exp.noWorkerAssignedTime && Date.now() - exp.noWorkerAssignedTime > this.tracker.constants.SCALE_UP_TIME - if ( - isWaiting && - (notSupportedByAnyWorker || notAssignedToAnyWorker) && - !this.tracker.trackedExpectationAPI.isExpectationWaitingForOther(exp) // Filter out expectations that aren't ready to begin working on anyway - ) { + // Is the expectation waiting for resources? + if (!isWaitingForOther && (isWaiting || notSupportedByAnyWorker || notAssignedToAnyWorkerForSomeTime)) { + // Add a second round of waiting, to ensure that we don't scale up prematurely: if (!exp.waitingForWorkerTime) { this.logger.silly( `Starting to track how long expectation "${expLabel(exp)}" has been waiting for a worker` @@ -84,6 +86,8 @@ export class WorkerScaler { } else { exp.waitingForWorkerTime = null } + + // If the expectation has been waiting for long enough: if (exp.waitingForWorkerTime) { const hasBeenWaitingFor = Date.now() - exp.waitingForWorkerTime if ( diff --git a/tests/internal-tests/src/__mocks__/child_process.ts b/tests/internal-tests/src/__mocks__/child_process.ts index 721fe02e..8ce2ba75 100644 --- a/tests/internal-tests/src/__mocks__/child_process.ts +++ b/tests/internal-tests/src/__mocks__/child_process.ts @@ -7,6 +7,7 @@ import path from 'path' const fsCopyFile = promisify(fs.copyFile) const fsMkdir = promisify(fs.mkdir) +const fsStat = promisify(fs.stat) const child_process: any = jest.createMockFromModule('child_process') @@ -173,7 +174,13 @@ async function robocopy(spawned: SpawnedProcess, args: string[]) { const source = path.join(sourceFolder, file) const destination = path.join(destinationFolder, file) - await fsMkdir(destinationFolder) // robocopy automatically creates the destination folder + try { + await fsStat(destinationFolder) + } catch (e) { + if (`${e}`.match(/ENOENT/)) { + await fsMkdir(destinationFolder) // robocopy automatically creates the destination folder + } else throw e + } await fsCopyFile(source, destination) } @@ -270,4 +277,5 @@ async function netUse(commandString: string): Promise<{ stdout: string; stderr: return { stdout, stderr } } } + module.exports = child_process diff --git a/tests/internal-tests/src/__tests__/basic.spec.ts b/tests/internal-tests/src/__tests__/basic.spec.ts index 9e5edbc9..e444c477 100644 --- a/tests/internal-tests/src/__tests__/basic.spec.ts +++ b/tests/internal-tests/src/__tests__/basic.spec.ts @@ -64,6 +64,7 @@ describeForAllPlatforms( }) beforeEach(() => { fs.__mockReset() + fs.__restoreCallbackInterceptor() env.reset() QGatewayClient.resetMock() }) @@ -294,6 +295,85 @@ describeForAllPlatforms( expect(env.expectationStatuses[EXP_copy0].statusInfo.status).toEqual('fulfilled') }) + + test('Be able to copy 10 local files', async () => { + const COUNT = 10 + + const COPY_TIME = env.SCALE_UP_TIME + + fs.__setCallbackInterceptor((type, cb) => { + if (type === 'copyFile') { + // Slow down copies: + setTimeout(cb, COPY_TIME) + } else cb() + }) + + const workforceRequestResourcesForExpectation = jest.fn(env.workforce.requestResourcesForExpectation) + env.workforce.requestResourcesForExpectation = workforceRequestResourcesForExpectation + + const expectations: Record = {} + fs.__mockSetDirectory('/targets/target0') + for (let i = 0; i < COUNT; i++) { + fs.__mockSetFile(`/sources/source0/file${i}Source.mp4`, 1234) + + const EXP_copy = protectString(`copy${i}`) + const PACKAGE = protectString(`package${i}`) + + expectations[EXP_copy] = literal({ + id: EXP_copy, + priority: 0, + managerId: MANAGER0, + fromPackages: [{ id: PACKAGE, expectedContentVersionHash: 'abcd1234' }], + type: Expectation.Type.FILE_COPY, + statusReport: { + label: `Copy file${i}`, + description: `Copy file${i} because test`, + requiredForPlayout: true, + displayRank: 0, + sendReport: true, + }, + startRequirement: { + sources: [getLocalSource(SOURCE0, `file${i}Source.mp4`)], + }, + endRequirement: { + targets: [getLocalTarget(TARGET0, `myFolder/file${i}Target.mp4`)], + content: { + filePath: `file${i}Target.mp4`, + }, + version: { type: Expectation.Version.Type.FILE_ON_DISK }, + }, + workOptions: {}, + }) + } + // console.log(fs.__printAllFiles()) + + env.expectationManager.updateExpectations(expectations) + + // Wait for the job to complete: + await waitUntil(() => { + expect(env.containerStatuses[TARGET0]).toBeTruthy() + }, env.WAIT_JOB_TIME) + + await waitUntil(() => { + const packageStatuses: any = { + actual: {}, + expected: {}, + } + for (const exp of Object.values(expectations)) { + const PACKAGE = exp.fromPackages[0].id + + packageStatuses.actual[PACKAGE] = + env.containerStatuses[TARGET0].packages[PACKAGE]?.packageStatus?.status + packageStatuses.expected[PACKAGE] = + ExpectedPackageStatusAPI.PackageContainerPackageStatusStatus.READY + } + expect(packageStatuses.actual).toMatchObject(packageStatuses.expected) + }, env.WAIT_JOB_TIME * 2 + COPY_TIME * 10) + + // Expect there to be requests to scale up workers: + expect(workforceRequestResourcesForExpectation.mock.calls.length).toBeGreaterThan(5) + expect(workforceRequestResourcesForExpectation.mock.calls.length).toBeLessThan(12) + }, 2000) } ) diff --git a/tests/internal-tests/src/__tests__/lib/setupEnv.ts b/tests/internal-tests/src/__tests__/lib/setupEnv.ts index e901a756..f210e2d7 100644 --- a/tests/internal-tests/src/__tests__/lib/setupEnv.ts +++ b/tests/internal-tests/src/__tests__/lib/setupEnv.ts @@ -26,6 +26,7 @@ import { objectKeys, ExpectationManagerId, AppContainerId, + literal, } from '@sofie-package-manager/api' import { ExpectationManager, @@ -198,6 +199,8 @@ export async function prepareTestEnviromnent(debugLogging: boolean): Promise({ constants: { EVALUATE_INTERVAL: WAIT_SCAN_TIME - WAIT_JOB_TIME - 300, FULLFILLED_MONITOR_TIME: WAIT_SCAN_TIME - WAIT_JOB_TIME - 300, WORK_TIMEOUT_TIME: WORK_TIMEOUT_TIME - 300, ERROR_WAIT_TIME: ERROR_WAIT_TIME - 300, + + SCALE_UP_TIME: SCALE_UP_TIME, }, - }, + }), logFilterFunction ) @@ -311,6 +316,7 @@ export async function prepareTestEnviromnent(debugLogging: boolean): Promise