diff --git a/apps/package-manager/packages/generic/src/generateExpectations/nrk/packageContainerExpectations.ts b/apps/package-manager/packages/generic/src/generateExpectations/nrk/packageContainerExpectations.ts index 4372a867..1ceffd62 100644 --- a/apps/package-manager/packages/generic/src/generateExpectations/nrk/packageContainerExpectations.ts +++ b/apps/package-manager/packages/generic/src/generateExpectations/nrk/packageContainerExpectations.ts @@ -52,6 +52,7 @@ export function getPackageContainerExpectations( targetLayers: ['source-smartbull'], // not used, since the layers of the original smartbull-package are used usePolling: 2000, awaitWriteFinishStabilityThreshold: 2000, + warningLimit: 3000, // We seem to get performance issues at around 9000 (when polling network drives), so 3000 should give us an early warning }, }, } diff --git a/shared/packages/api/src/logger.ts b/shared/packages/api/src/logger.ts index 4f3b1d63..9daca243 100644 --- a/shared/packages/api/src/logger.ts +++ b/shared/packages/api/src/logger.ts @@ -61,7 +61,8 @@ export function setupLogger( category: string, categoryLabel?: string, handleProcess = false, - initialLogLevel?: LogLevel + initialLogLevel?: LogLevel, + filterFcn?: (level: string, ...args: any[]) => boolean ): LoggerInstance { if (!loggerContainer) throw new Error('Logging has not been set up! initializeLogger() must be called first.') @@ -141,9 +142,31 @@ export function setupLogger( `${category ? `${category}.` : ''}${subCategory}`, subLabel && `${categoryLabel}>${subLabel}`, undefined, - initialLogLevel + initialLogLevel, + filterFcn ) } + if (filterFcn) { + for (const methodName of [ + 'error', + 'warn', + 'help', + 'data', + 'info', + 'debug', + 'prompt', + 'http', + 'verbose', + 'input', + 'silly', + ]) { + const orgMethod = (loggerInstance as any)[methodName] + ;(loggerInstance as any)[methodName] = (...args: any[]) => { + if (filterFcn(methodName, ...args)) orgMethod.call(loggerInstance, ...args) + } + } + } + allLoggers.set(category, loggerInstance) return loggerInstance } diff --git a/shared/packages/api/src/packageContainerApi.ts b/shared/packages/api/src/packageContainerApi.ts index 297184cc..2d7ed7c1 100644 --- a/shared/packages/api/src/packageContainerApi.ts +++ b/shared/packages/api/src/packageContainerApi.ts @@ -29,10 +29,12 @@ export interface PackageContainerExpectation extends PackageContainer { /** If set, ignore any files matching this. (Regular expression). */ ignore?: string - /** If set, the monitoring will be using polling */ + /** If set, the monitoring will be using polling, at the given interval [ms] */ usePolling?: number | null /** If set, will set the awaitWriteFinish.StabilityThreshold of chokidar */ awaitWriteFinishStabilityThreshold?: number | null + /** If set, the monitor will warn if the monitored number of packages is greater than this */ + warningLimit?: number /** What layers to set on the resulting ExpectedPackage */ targetLayers: string[] diff --git a/shared/packages/worker/src/worker/accessorHandlers/genericHandle.ts b/shared/packages/worker/src/worker/accessorHandlers/genericHandle.ts index 4626aaa1..66b495ef 100644 --- a/shared/packages/worker/src/worker/accessorHandlers/genericHandle.ts +++ b/shared/packages/worker/src/worker/accessorHandlers/genericHandle.ts @@ -6,6 +6,8 @@ import { HelpfulEventEmitter, AccessorId, MonitorId, + promiseTimeout, + INNER_ACTION_TIMEOUT, } from '@sofie-package-manager/api' import { GenericWorker } from '../worker' import { MonitorInProgress } from '../lib/monitorInProgress' @@ -20,7 +22,49 @@ export abstract class GenericAccessorHandle { protected _accessor: AccessorOnPackage.Any, protected _content: unknown, public readonly type: string - ) {} + ) { + // Wrap all accessor methods which return promises into promiseTimeout. + // This is to get a finer grained logging, in case of a timeout: + + /** List of all methods */ + const methodsToWrap: Array> = [ + 'checkPackageReadAccess', + 'tryPackageRead', + 'checkPackageContainerWriteAccess', + 'getPackageActualVersion', + 'removePackage', + 'fetchMetadata', + 'updateMetadata', + 'removeMetadata', + 'getPackageReadStream', + 'putPackageStream', + 'getPackageReadInfo', + 'putPackageInfo', + 'prepareForOperation', + 'finalizePackage', + 'runCronJob', + 'setupPackageContainerMonitors', + ] + + for (const methodName of methodsToWrap) { + const originalMethod = this[methodName] as (...args: any[]) => Promise + + ;(this as any)[methodName] = async function (...args: any[]) { + return promiseTimeout( + originalMethod.call(this, ...args), + INNER_ACTION_TIMEOUT, + (duration) => + `Timeout after ${duration} ms in ${methodName} for Accessor "${ + this.accessorId + }". Context: ${JSON.stringify({ + type: type, + accessor: this._accessor, + content: this._content, + })}` + ) + } + } + } /** * A string that can identify the package. diff --git a/shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts b/shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts index fa3dadda..cedea4da 100644 --- a/shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts +++ b/shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts @@ -168,7 +168,7 @@ export abstract class GenericFileAccessorHandle extends GenericAccesso } ) - monitorInProgress._reportStatus(StatusCode.UNKNOWN, { + monitorInProgress._setStatus('setup', StatusCode.UNKNOWN, { user: 'Setting up file watcher...', tech: `Setting up file watcher...`, }) @@ -179,8 +179,8 @@ export abstract class GenericFileAccessorHandle extends GenericAccesso } if (options.usePolling) { chokidarOptions.usePolling = true - chokidarOptions.interval = 2000 - chokidarOptions.binaryInterval = 2000 + chokidarOptions.interval = options.usePolling + chokidarOptions.binaryInterval = options.usePolling } if (options.awaitWriteFinishStabilityThreshold) { chokidarOptions.awaitWriteFinish = { @@ -220,20 +220,22 @@ export abstract class GenericFileAccessorHandle extends GenericAccesso for (let [filePath, version] of seenFiles.entries()) { // Update the version if (!version) { + const fullPath = path.join(this.folderPath, filePath) try { - const fullPath = path.join(this.folderPath, filePath) const stat = await fsStat(fullPath) version = this.convertStatToVersion(stat) seenFiles.set(filePath, version) + + monitorInProgress._unsetStatus(fullPath) } catch (err) { version = null this.worker.logger.error( - `GenericFileAccessorHandle.setupPackagesMonitor: Unexpected Exception cautght: ${stringifyError( + `GenericFileAccessorHandle.setupPackagesMonitor: Unexpected Exception caught: ${stringifyError( err )}` ) - monitorInProgress._reportStatus(StatusCode.BAD, { + monitorInProgress._setStatus(fullPath, StatusCode.BAD, { user: 'Error when accessing watched file', tech: `Error: ${stringifyError(err)}`, }) @@ -290,6 +292,16 @@ export abstract class GenericFileAccessorHandle extends GenericAccesso arguments: [packageContainerExp.id, monitorId, packages], }) + if (options.warningLimit && seenFiles.size > options.warningLimit) { + monitorInProgress._setStatus('warningLimit', StatusCode.WARNING_MAJOR, { + user: 'Warning: Too many files for monitor', + tech: `There are ${seenFiles.size} files in the folder, which might cause performance issues. Reduce the number of files to below ${options.warningLimit} to get rid of this warning.`, + }) + } else { + monitorInProgress._unsetStatus('warningLimit') + } + + // Finally triggerSendUpdateIsRunning = false if (triggerSendUpdateRunAgain) triggerSendUpdate() })().catch((err) => { @@ -330,6 +342,8 @@ export abstract class GenericFileAccessorHandle extends GenericAccesso .catch(() => { // The file truly doesn't exist + monitorInProgress._unsetStatus(fullPath) + const localPath = getFilePath(fullPath) if (localPath) { seenFiles.delete(localPath) @@ -343,16 +357,17 @@ export abstract class GenericFileAccessorHandle extends GenericAccesso err )}` ) - monitorInProgress._reportStatus(StatusCode.BAD, { + monitorInProgress._setStatus('watcher', StatusCode.BAD, { user: 'Error in file watcher', tech: `chokidar error: ${stringifyError(err)}`, }) }) .on('ready', () => { - monitorInProgress._reportStatus(StatusCode.GOOD, { + monitorInProgress._setStatus('setup', StatusCode.GOOD, { user: 'File watcher is set up', tech: `File watcher is set up`, }) + triggerSendUpdate() }) return monitorInProgress diff --git a/shared/packages/worker/src/worker/lib/monitorInProgress.ts b/shared/packages/worker/src/worker/lib/monitorInProgress.ts index 02e42a60..3d60f056 100644 --- a/shared/packages/worker/src/worker/lib/monitorInProgress.ts +++ b/shared/packages/worker/src/worker/lib/monitorInProgress.ts @@ -12,6 +12,8 @@ export declare interface IMonitorInProgress { stop: () => Promise } export class MonitorInProgress extends HelpfulEventEmitter implements IMonitorInProgress { + private statuses: Map = new Map() + private lastReportedStatus: { status: StatusCode; reason: Reason } | undefined = undefined constructor(public properties: MonitorProperties, private _onStop: () => Promise) { super() } @@ -19,22 +21,52 @@ export class MonitorInProgress extends HelpfulEventEmitter implements IMonitorIn return this._onStop() } - // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types - _reportStatus(status: StatusCode, reason: Reason): void { - this.emit('status', status, reason) + _setStatus(internalId: string, status: StatusCode, reason: Reason): void { + this.statuses.set(internalId, { status, reason }) + + this._reportStatus() + } + _unsetStatus(internalId: string): void { + this.statuses.delete(internalId) + + this._reportStatus() + } + private _reportStatus(): void { + // Emit the worst status: + let worstStatus: { status: StatusCode; reason: Reason } | undefined = undefined + for (const status of this.statuses.values()) { + if (!worstStatus || worstStatus.status < status.status) { + worstStatus = status + } + } + + if (!worstStatus) + worstStatus = { + status: StatusCode.UNKNOWN, + reason: { + user: 'Not yet initialized', + tech: 'Not yet initialized', + }, + } + + if (this.lastReportedStatus?.status !== worstStatus.status) { + this.lastReportedStatus = worstStatus + + this.emit('status', worstStatus.status, worstStatus.reason) + } } /** Convenience function which calls the function that sets up the monitor */ setup(fcn: () => Promise | void): MonitorInProgress { setTimeout(() => { try { Promise.resolve(fcn()).catch((err) => { - this._reportStatus(StatusCode.BAD, { + this._setStatus('setup', StatusCode.BAD, { user: 'Internal error when setting up monitor', tech: `Error: ${stringifyError(err)}`, }) }) } catch (err) { - this._reportStatus(StatusCode.BAD, { + this._setStatus('setup', StatusCode.BAD, { user: 'Internal error when setting up monitor', tech: `Error: ${stringifyError(err)}`, }) diff --git a/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/lib.ts b/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/lib.ts index 44e77e6c..06a9dddc 100644 --- a/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/lib.ts +++ b/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/lib.ts @@ -17,6 +17,8 @@ import { ReturnTypeDoYouSupportExpectation, assertNever, AccessorId, + promiseTimeout, + INNER_ACTION_TIMEOUT, } from '@sofie-package-manager/api' import { LocalFolderAccessorHandle } from '../../../accessorHandlers/localFolder' import { FileShareAccessorHandle } from '../../../accessorHandlers/fileShare' @@ -139,128 +141,176 @@ export async function lookupAccessorHandles( expectationWorkOptions: unknown, checks: LookupChecks ): Promise> { - /** undefined if all good, error string otherwise */ - let errorReason: undefined | Reason = { user: 'No target found', tech: 'No target found' } - - // See if the file is available at any of the targets: - for (const { packageContainer, accessorId, accessor } of prioritizeAccessors(packageContainers)) { - errorReason = undefined - - const handle = getAccessorHandle( - worker, - accessorId, - accessor, - expectationContent, - expectationWorkOptions - ) + const prioritizedAccessors = prioritizeAccessors(packageContainers) + + return promiseTimeout>( + (async () => { + /** undefined if all good, error string otherwise */ + let errorReason: undefined | Reason = { user: 'No target found', tech: 'No target found' } + + // See if the file is available at any of the targets: + for (const { packageContainer, accessorId, accessor } of prioritizedAccessors) { + errorReason = undefined + + const handle = getAccessorHandle( + worker, + accessorId, + accessor, + expectationContent, + expectationWorkOptions + ) - if (checks.read) { - // Check that the accessor-handle supports reading: - const readResult = handle.checkHandleRead() - if (!readResult.success) { - errorReason = { - user: `There is an issue with the configuration for the PackageContainer "${ - packageContainer.label - }" (on accessor "${accessor.label || accessorId}"): ${readResult.reason.user}`, - tech: `${packageContainer.label}: Accessor "${accessor.label || accessorId}": ${ - readResult.reason.tech - }`, + if (checks.read) { + // Check that the accessor-handle supports reading: + const readResult = handle.checkHandleRead() + if (!readResult.success) { + errorReason = { + user: `There is an issue with the configuration for the PackageContainer "${ + packageContainer.label + }" (on accessor "${accessor.label || accessorId}"): ${readResult.reason.user}`, + tech: `${packageContainer.label}: Accessor "${accessor.label || accessorId}": ${ + readResult.reason.tech + }`, + } + continue // Maybe next accessor works? + } } - continue // Maybe next accessor works? - } - } - if (checks.readPackage) { - // Check that the Package can be read: - const readResult = await handle.checkPackageReadAccess() - if (!readResult.success) { - errorReason = { - user: `Can't read the Package from PackageContainer "${packageContainer.label}" (on accessor "${ - accessor.label || accessorId - }"), due to: ${readResult.reason.user}`, - tech: `${packageContainer.label}: Accessor "${accessor.label || accessorId}": ${ - readResult.reason.tech - }`, + if (checks.readPackage) { + // Check that the Package can be read: + const readResult = await promiseTimeout( + handle.checkPackageReadAccess(), + INNER_ACTION_TIMEOUT, + (duration) => + `Timeout after ${duration} ms in lookupAccessorHandles->checkPackageReadAccess for Accessor "${accessorId}", ${JSON.stringify( + { + expectationContent, + expectationWorkOptions, + } + )}` + ) + if (!readResult.success) { + errorReason = { + user: `Can't read the Package from PackageContainer "${ + packageContainer.label + }" (on accessor "${accessor.label || accessorId}"), due to: ${readResult.reason.user}`, + tech: `${packageContainer.label}: Accessor "${accessor.label || accessorId}": ${ + readResult.reason.tech + }`, + } + + continue // Maybe next accessor works? + } } + if (checks.packageVersion !== undefined) { + // Check that the version of the Package is correct: - continue // Maybe next accessor works? - } - } - if (checks.packageVersion !== undefined) { - // Check that the version of the Package is correct: - const actualSourceVersion = await handle.getPackageActualVersion() - - const compareVersionResult = compareActualExpectVersions(actualSourceVersion, checks.packageVersion) - if (!compareVersionResult.success) { - errorReason = { - user: `Won't read from the package, due to: ${compareVersionResult.reason.user}`, - tech: `${packageContainer.label}: Accessor "${accessor.label || accessorId}": ${ - compareVersionResult.reason.tech - }`, + const actualSourceVersion = await promiseTimeout( + handle.getPackageActualVersion(), + INNER_ACTION_TIMEOUT, + (duration) => + `Timeout after ${duration} ms in lookupAccessorHandles->getPackageActualVersion for Accessor "${accessorId}", ${JSON.stringify( + { + expectationContent, + expectationWorkOptions, + } + )}` + ) + + const compareVersionResult = compareActualExpectVersions(actualSourceVersion, checks.packageVersion) + if (!compareVersionResult.success) { + errorReason = { + user: `Won't read from the package, due to: ${compareVersionResult.reason.user}`, + tech: `${packageContainer.label}: Accessor "${accessor.label || accessorId}": ${ + compareVersionResult.reason.tech + }`, + } + continue // Maybe next accessor works? + } } - continue // Maybe next accessor works? - } - } - if (checks.write) { - // Check that the accessor-handle supports writing: - const writeResult = handle.checkHandleWrite() - if (!writeResult.success) { - errorReason = { - user: `There is an issue with the configuration for the PackageContainer "${ - packageContainer.label - }" (on accessor "${accessor.label || accessorId}"): ${writeResult.reason.user}`, - tech: `${packageContainer.label}: lookupTargets: Accessor "${accessor.label || accessorId}": ${ - writeResult.reason.tech - }`, + if (checks.write) { + // Check that the accessor-handle supports writing: + const writeResult = handle.checkHandleWrite() + if (!writeResult.success) { + errorReason = { + user: `There is an issue with the configuration for the PackageContainer "${ + packageContainer.label + }" (on accessor "${accessor.label || accessorId}"): ${writeResult.reason.user}`, + tech: `${packageContainer.label}: lookupTargets: Accessor "${ + accessor.label || accessorId + }": ${writeResult.reason.tech}`, + } + continue // Maybe next accessor works? + } } - continue // Maybe next accessor works? - } - } - if (checks.writePackageContainer) { - // Check that it is possible to write to write to the package container: - const writeAccessResult = await handle.checkPackageContainerWriteAccess() - if (!writeAccessResult.success) { - errorReason = { - user: `Can't write to the PackageContainer "${packageContainer.label}" (on accessor "${ - accessor.label || accessorId - }"), due to: ${writeAccessResult.reason.user}`, - tech: `${packageContainer.label}: Accessor "${accessor.label || accessorId}": ${ - writeAccessResult.reason.tech - }`, + if (checks.writePackageContainer) { + // Check that it is possible to write to write to the package container: + + const writeAccessResult = await promiseTimeout( + handle.checkPackageContainerWriteAccess(), + INNER_ACTION_TIMEOUT, + (duration) => + `Timeout after ${duration} ms in lookupAccessorHandles->checkPackageContainerWriteAccess for Accessor "${accessorId}", ${JSON.stringify( + { + expectationContent, + expectationWorkOptions, + } + )}` + ) + + if (!writeAccessResult.success) { + errorReason = { + user: `Can't write to the PackageContainer "${packageContainer.label}" (on accessor "${ + accessor.label || accessorId + }"), due to: ${writeAccessResult.reason.user}`, + tech: `${packageContainer.label}: Accessor "${accessor.label || accessorId}": ${ + writeAccessResult.reason.tech + }`, + } + continue // Maybe next accessor works? + } } - continue // Maybe next accessor works? - } - } - if (typeof checks.customCheck === 'function') { - const checkResult = checks.customCheck(packageContainer, accessorId, accessor) - if (!checkResult.success) { - errorReason = { - user: checkResult.reason.user, - tech: checkResult.reason.tech, + if (typeof checks.customCheck === 'function') { + const checkResult = checks.customCheck(packageContainer, accessorId, accessor) + if (!checkResult.success) { + errorReason = { + user: checkResult.reason.user, + tech: checkResult.reason.tech, + } + continue // Maybe next accessor works? + } } - continue // Maybe next accessor works? - } - } - if (!errorReason) { - // All good, no need to look further: + if (!errorReason) { + // All good, no need to look further: + return { + accessor: accessor, + handle: handle, + ready: true, + // reason: `Can access target "${packageContainer.label}" through accessor "${ + // accessor.label || accessorId + // }"`, + } + } + } return { - accessor: accessor, - handle: handle, - ready: true, - // reason: `Can access target "${packageContainer.label}" through accessor "${ - // accessor.label || accessorId - // }"`, + accessor: undefined, + ready: false, + reason: errorReason, } - } - } - return { - accessor: undefined, - ready: false, - reason: errorReason, - } + })(), + INNER_ACTION_TIMEOUT, + (duration) => + `Timeout after ${duration} ms in lookupAccessorHandles. (${ + prioritizedAccessors.length + } prioritizedAccessors, ${JSON.stringify({ + expectationContent, + expectationWorkOptions, + checks, + })})` + ) } /** Converts a diff to some kind of user-readable string */ diff --git a/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/lib/scan.ts b/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/lib/scan.ts index 62dd34f4..245daa10 100644 --- a/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/lib/scan.ts +++ b/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/lib/scan.ts @@ -1,5 +1,5 @@ import { execFile, ChildProcess, spawn } from 'child_process' -import { Expectation, assertNever, Accessor, AccessorOnPackage } from '@sofie-package-manager/api' +import { Expectation, assertNever, Accessor, AccessorOnPackage, LoggerInstance } from '@sofie-package-manager/api' import { isQuantelClipAccessorHandle, isLocalFolderAccessorHandle, @@ -227,7 +227,8 @@ export function scanMoreInfo( onProgress: ( /** Progress, goes from 0 to 1 */ progress: number - ) => void + ) => void, + logger: LoggerInstance ): CancelablePromise<{ scenes: number[] freezes: ScanAnomaly[] @@ -310,7 +311,10 @@ export function scanMoreInfo( ffMpegProcess.stderr.on('data', (data: any) => { const stringData = data.toString() - if (typeof stringData !== 'string') return + if (typeof stringData !== 'string') { + logger.warn(`FFMpeg: bad stderr data (${typeof stringData})`) + return + } try { const frameRegex = /^frame= +\d+/g diff --git a/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/packageDeepScan.ts b/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/packageDeepScan.ts index 8623b1e7..33d6a1a6 100644 --- a/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/packageDeepScan.ts +++ b/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/packageDeepScan.ts @@ -136,7 +136,7 @@ export const PackageDeepScan: ExpectationWindowsHandler = { if (!lookupTarget.ready) throw new Error(`Can't start working due to target: ${lookupTarget.reason.tech}`) let currentProcess: CancelablePromise | undefined - const workInProgress = new WorkInProgress({ workLabel: 'Scanning file' }, async () => { + const workInProgress = new WorkInProgress({ workLabel: 'Deep Scanning file' }, async () => { // On cancel currentProcess?.cancel() }).do(async () => { @@ -185,9 +185,15 @@ export const PackageDeepScan: ExpectationWindowsHandler = { let resultFreezes: ScanAnomaly[] = [] let resultScenes: number[] = [] if (hasVideoStream) { - currentProcess = scanMoreInfo(sourceHandle, ffProbeScan, exp.endRequirement.version, (progress) => { - workInProgress._reportProgress(sourceVersionHash, 0.21 + 0.77 * progress) - }) + currentProcess = scanMoreInfo( + sourceHandle, + ffProbeScan, + exp.endRequirement.version, + (progress) => { + workInProgress._reportProgress(sourceVersionHash, 0.21 + 0.77 * progress) + }, + worker.logger.category('scanMoreInfo') + ) const result = await currentProcess resultBlacks = result.blacks resultFreezes = result.freezes diff --git a/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/packageLoudnessScan.ts b/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/packageLoudnessScan.ts index 90e5962c..2aa926be 100644 --- a/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/packageLoudnessScan.ts +++ b/shared/packages/worker/src/worker/workers/windowsWorker/expectationHandlers/packageLoudnessScan.ts @@ -135,7 +135,7 @@ export const PackageLoudnessScan: ExpectationWindowsHandler = { if (!lookupTarget.ready) throw new Error(`Can't start working due to target: ${lookupTarget.reason.tech}`) let currentProcess: CancelablePromise | undefined - const workInProgress = new WorkInProgress({ workLabel: 'Scanning file' }, async () => { + const workInProgress = new WorkInProgress({ workLabel: 'Scanning file (loudness)' }, async () => { // On cancel currentProcess?.cancel() }).do(async () => { diff --git a/shared/packages/worker/src/workerAgent.ts b/shared/packages/worker/src/workerAgent.ts index 2d1da1b1..75dfca01 100644 --- a/shared/packages/worker/src/workerAgent.ts +++ b/shared/packages/worker/src/workerAgent.ts @@ -478,13 +478,14 @@ export class WorkerAgent { currentJob.timeoutInterval = null return } + const timeSinceLastUpdate = Date.now() - currentJob.lastUpdated - if (Date.now() - currentJob.lastUpdated > timeout) { + if (timeSinceLastUpdate > timeout) { // The job seems to have timed out. // Expectation Manager will clean up on it's side, we have to do the same here. this.logger.warn( - `WorkerAgent: Cancelling job "${currentJob.workInProgress?.properties.workLabel}" (${currentJob.wipId}) due to timeout (${timeout})` + `WorkerAgent: Cancelling job "${currentJob.workInProgress?.properties.workLabel}" (${currentJob.wipId}) due to timeout (${timeSinceLastUpdate} > ${timeout})` ) if (currentJob.timeoutInterval) { clearInterval(currentJob.timeoutInterval) diff --git a/tests/internal-tests/src/__mocks__/fs.ts b/tests/internal-tests/src/__mocks__/fs.ts index 2159a06b..9e112f53 100644 --- a/tests/internal-tests/src/__mocks__/fs.ts +++ b/tests/internal-tests/src/__mocks__/fs.ts @@ -11,13 +11,6 @@ const wndMock = wndMock0 as any as WNDMockType /* eslint-disable no-console */ const DEBUG_LOG = false -enum fsConstants { - F_OK = 0, - X_OK = 1, - W_OK = 2, - R_OK = 4, -} - const fs: any = jest.createMockFromModule('fs') type MockAny = MockDirectory | MockFile @@ -201,7 +194,11 @@ export function __printAllFiles(): string { strs.push(`${indent}${name}/`) strs.push(getPaths(file, indent + ' ')) } else { - strs.push(`${indent}${name}: size: ${file.size}`) + strs.push( + `${indent}${name}: size: ${file.size} (${file.accessRead ? 'read' : ''} ${ + file.accessWrite ? 'write' : '' + })` + ) } } return strs.join('\n') @@ -291,6 +288,14 @@ export function __emitter(): EventEmitter { } fs.__emitter = __emitter +export enum constants { + F_OK = 0, + X_OK = 1, + W_OK = 2, + R_OK = 4, +} +fs.constants = constants + export function stat(path: string, callback: (error: any, result?: any) => void): void { path = fixPath(path) if (DEBUG_LOG) console.log('fs.stat', path) @@ -313,23 +318,34 @@ export function stat(path: string, callback: (error: any, result?: any) => void) fs.stat = stat export function access(path: string, mode: number | undefined, callback: (error: any, result?: any) => void): void { + if (mode === undefined) + throw new Error( + `Mock fs.access: Don't use mode===undefined in Package Manager (or perhaps the mock fs constants aren't setup correctly?)` + ) path = fixPath(path) - if (DEBUG_LOG) console.log('fs.access', path, mode) + const mockFile = getMock(path) + // if (DEBUG_LOG) console.log('fs.access', path, mode) fsMockEmitter.emit('access', path, mode) - try { - const mockFile = getMock(path) - if (mode === fsConstants.R_OK && !mockFile.accessRead) { - return callback({ someError: 'Mock: read access denied ' }) - } else if (mode === fsConstants.W_OK && !mockFile.accessWrite) { - return callback({ someError: 'Mock: write access denied ' }) - } else { - return callback(undefined, null) + setTimeout(() => { + try { + if (mode === constants.R_OK && !mockFile.accessRead) { + return callback({ someError: 'Mock: read access denied ' }) + } else if (mode === constants.W_OK && !mockFile.accessWrite) { + return callback({ someError: 'Mock: write access denied ' }) + } else { + return callback(undefined, null) + } + } catch (err) { + callback(err) } - } catch (err) { - callback(err) - } + }, FS_ACCESS_DELAY) } fs.access = access +let FS_ACCESS_DELAY = 0 +export function __mockSetAccessDelay(delay: number): void { + FS_ACCESS_DELAY = delay +} +fs.__mockSetAccessDelay = __mockSetAccessDelay export function unlink(path: string, callback: (error: any, result?: any) => void): void { path = fixPath(path) diff --git a/tests/internal-tests/src/__tests__/issues.spec.ts b/tests/internal-tests/src/__tests__/issues.spec.ts index e41e1f23..5667ff60 100644 --- a/tests/internal-tests/src/__tests__/issues.spec.ts +++ b/tests/internal-tests/src/__tests__/issues.spec.ts @@ -10,6 +10,7 @@ import { PackageContainerId, literal, protectString, + INNER_ACTION_TIMEOUT, } from '@sofie-package-manager/api' import type * as fsMockType from '../__mocks__/fs' import { prepareTestEnviromnent, TestEnviromnent } from './lib/setupEnv' @@ -28,16 +29,14 @@ const fsAccess = promisify(fs.access) const fsStat = promisify(fs.stat) const fsExists = async (filePath: string) => { - let exists = false try { - await fsAccess(filePath, undefined) + await fsAccess(filePath, fs.constants.R_OK) // The file exists - exists = true + return true } catch (err) { - // Ignore + if (typeof err === 'object' && err && (err as any).code === 'ENOENT') return false + throw err } - - return exists } // const fsStat = promisify(fs.stat) @@ -63,7 +62,7 @@ describeForAllPlatforms( expect(fs.lstat).toBeTruthy() expect(fs.__mockReset).toBeTruthy() - jest.setTimeout(env.WAIT_JOB_TIME * 10 + env.WAIT_SCAN_TIME * 2) + jest.setTimeout(env.WAIT_JOB_TIME_SAFE * 10 + env.WAIT_SCAN_TIME * 2) }) afterAll(() => { env.terminate() @@ -71,6 +70,10 @@ describeForAllPlatforms( beforeEach(() => { fs.__mockReset() env.reset() + fs.__mockSetAccessDelay(0) // Reset any access delay + }) + afterEach(() => { + fs.__mockSetAccessDelay(0) // Reset any access delay }) }, (_platform: string) => { @@ -89,13 +92,13 @@ describeForAllPlatforms( expect(env.expectationStatuses[EXP_copy0]).toMatchObject({ actualVersionHash: null, statusInfo: { - status: /new|waiting/, + status: expect.stringMatching(/new|waiting/), statusReason: { - tech: /not able to access file/i, + tech: expect.stringMatching(/not able to access file/i), }, }, }) - }, env.WAIT_JOB_TIME) + }, env.WAIT_JOB_TIME_SAFE) expect(env.containerStatuses[TARGET0].packages[PACKAGE0].packageStatus?.status).toEqual( ExpectedPackageStatusAPI.PackageContainerPackageStatusStatus.NOT_FOUND @@ -112,13 +115,46 @@ describeForAllPlatforms( expect(env.containerStatuses[TARGET0].packages[PACKAGE0].packageStatus?.status).toEqual( ExpectedPackageStatusAPI.PackageContainerPackageStatusStatus.READY ) - }, env.WAIT_SCAN_TIME + env.ERROR_WAIT_TIME + env.WAIT_JOB_TIME) + }, env.WAIT_SCAN_TIME + env.ERROR_WAIT_TIME + env.WAIT_JOB_TIME_SAFE) expect(env.expectationStatuses[EXP_copy0].statusInfo.status).toEqual('fulfilled') expect(await fsStat('/targets/target0/file0Target.mp4')).toMatchObject({ size: 1234, }) }) + test('Slow responding file operations', async () => { + fs.__mockSetDirectory('/sources/source0/') + fs.__mockSetDirectory('/targets/target0') + fs.__mockSetFile('/sources/source0/file0Source.mp4', 1234) + fs.__mockSetAccessDelay(INNER_ACTION_TIMEOUT + 100) // Simulate a slow file operation + + env.setLogFilterFunction((level, ...args) => { + const str = args.join(',') + // Suppress some logged warnings: + if (level === 'warn' && str.includes('checkPackageContainerWriteAccess')) return false + return true + }) + + addCopyFileExpectation( + env, + EXP_copy0, + [getLocalSource(SOURCE0, 'file0Source.mp4')], + [getLocalTarget(TARGET0, 'file0Target.mp4')] + ) + + await waitUntil(() => { + // Expect the Expectation to be waiting: + expect(env.expectationStatuses[EXP_copy0]).toMatchObject({ + actualVersionHash: null, + statusInfo: { + // status: expect.stringMatching(/fulfilled/), + statusReason: { + tech: expect.stringMatching(/timeout.*checkPackageContainerWriteAccess.*Accessor.*/i), + }, + }, + }) + }, INNER_ACTION_TIMEOUT + 100) + }) test.skip('Wait for non-existing network-shared, file', async () => { // To be written @@ -146,7 +182,6 @@ describeForAllPlatforms( accessRead: true, accessWrite: false, }) - // fs.__printAllFiles() addCopyFileExpectation( env, @@ -160,13 +195,13 @@ describeForAllPlatforms( expect(env.expectationStatuses[EXP_copy0]).toMatchObject({ actualVersionHash: null, statusInfo: { - status: /new|waiting/, + status: expect.stringMatching(/new|waiting/), statusReason: { - tech: /not able to access file/i, + tech: expect.stringMatching(/not able to access file/i), }, }, }) - }, env.WAIT_JOB_TIME) + }, env.WAIT_JOB_TIME_SAFE) // Now the file can be read from: fs.__mockSetFile('/sources/source0/file0Source.mp4', 1234) @@ -174,13 +209,13 @@ describeForAllPlatforms( await waitTime(env.WAIT_SCAN_TIME) await waitUntil(() => { - // Expect the Expectation to be waiting: + // Expect the Expectation to be waiting -> new: expect(env.expectationStatuses[EXP_copy0]).toMatchObject({ actualVersionHash: null, statusInfo: { status: 'new', statusReason: { - tech: /not able to access target/i, + tech: expect.stringMatching(/Not able to write to container folder.*write access denied/i), }, }, }) @@ -199,7 +234,7 @@ describeForAllPlatforms( expect(env.containerStatuses[TARGET0].packages[PACKAGE0].packageStatus?.status).toEqual( ExpectedPackageStatusAPI.PackageContainerPackageStatusStatus.READY ) - }, env.WAIT_JOB_TIME) + }, env.WAIT_JOB_TIME_SAFE) expect(env.expectationStatuses[EXP_copy0].statusInfo.status).toEqual('fulfilled') expect(await fsStat('/targets/target0/file0Target.mp4')).toMatchObject({ @@ -251,6 +286,14 @@ describeForAllPlatforms( fs.__emitter().once('copyFile', listenToCopyFile) + env.setLogFilterFunction((level, ...args) => { + const str = args.join(',') + // Suppress some logged warnings: + if (level === 'warn' && str.includes('stalled, restarting')) return false + if (level === 'error' && str.includes('cancelling timed out work')) return false + return true + }) + addCopyFileExpectation( env, EXP_copy0, @@ -269,7 +312,7 @@ describeForAllPlatforms( expect(env.expectationStatuses[EXP_copy0].statusInfo.status).toEqual( expect.stringMatching(/new|waiting/) ) - }, env.WORK_TIMEOUT_TIME + env.WAIT_JOB_TIME) + }, env.WORK_TIMEOUT_TIME + env.WAIT_JOB_TIME_SAFE) // Add another worker: env.addWorker() @@ -313,6 +356,14 @@ describeForAllPlatforms( }) fs.__emitter().once('copyFile', listenToCopyFile) + env.setLogFilterFunction((level, ...args) => { + const str = args.join(',') + // Suppress some logged warnings: + if (level === 'warn' && str.includes('stalled, restarting')) return false + if (level === 'warn' && str.includes('Cancelling job')) return false + return true + }) + addCopyFileExpectation( env, EXP_copy0, @@ -332,7 +383,7 @@ describeForAllPlatforms( expect(env.expectationStatuses[EXP_copy0].statusInfo.status).toEqual( expect.stringMatching(/new|waiting|ready|fulfilled/) ) - }, env.WORK_TIMEOUT_TIME + env.WAIT_JOB_TIME) + }, env.WORK_TIMEOUT_TIME + env.WAIT_JOB_TIME_SAFE) // Wait for the copy to complete: await waitUntil(() => { @@ -427,19 +478,21 @@ describeForAllPlatforms( expect(env.containerStatuses[TARGET0].packages[PACKAGE0].packageStatus?.status).toEqual( ExpectedPackageStatusAPI.PackageContainerPackageStatusStatus.READY ) - }, env.WAIT_JOB_TIME) + }, env.WAIT_JOB_TIME_SAFE) await waitUntil(() => { expect(env.containerStatuses[TARGET1]).toBeTruthy() expect(env.containerStatuses[TARGET1].packages[PACKAGE0]).toBeTruthy() expect(env.containerStatuses[TARGET1].packages[PACKAGE0].packageStatus?.status).toEqual( ExpectedPackageStatusAPI.PackageContainerPackageStatusStatus.READY ) - }, env.WAIT_JOB_TIME) + }, env.WAIT_JOB_TIME_SAFE) - // Check that step 1 and 2 fullfills: + // Check that step 1 and 2 fulfills: expect(env.expectationStatuses[STEP1].statusInfo.status).toEqual('fulfilled') expect(env.expectationStatuses[STEP2].statusInfo.status).toEqual('fulfilled') + expect(await fsExists('/targets/target0/myFolder/file0Target.mp4')).toBe(true) + expect(await fsStat('/targets/target0/myFolder/file0Target.mp4')).toMatchObject({ size: 1234, }) @@ -456,13 +509,13 @@ describeForAllPlatforms( expect(env.containerStatuses[TARGET0].packages[PACKAGE0].packageStatus?.status).toEqual( ExpectedPackageStatusAPI.PackageContainerPackageStatusStatus.NOT_FOUND ) - }, env.WAIT_JOB_TIME) + }, env.WAIT_JOB_TIME_SAFE) // Step 2 should be un-fulfilled, since it depends on step 1. await waitUntil(() => { expect(env.expectationStatuses[STEP1].statusInfo.status).toMatch(/waiting|new/) expect(env.expectationStatuses[STEP2].statusInfo.status).toMatch(/waiting|new/) - }, env.WAIT_JOB_TIME) + }, env.WAIT_JOB_TIME_SAFE) // The step1-copied file should remain, since removePackageOnUnFulfill is not set expect(await fsExists('/targets/target0/myFolder/file0Target.mp4')).toBe(true) @@ -476,7 +529,7 @@ describeForAllPlatforms( await waitUntil(() => { expect(env.expectationStatuses[STEP1].statusInfo.status).toBe('fulfilled') expect(env.expectationStatuses[STEP2].statusInfo.status).toBe('fulfilled') - }, env.WAIT_JOB_TIME) + }, env.WAIT_JOB_TIME_SAFE) }) } ) diff --git a/tests/internal-tests/src/__tests__/lib/setupEnv.ts b/tests/internal-tests/src/__tests__/lib/setupEnv.ts index 5b7553ec..93a2b9aa 100644 --- a/tests/internal-tests/src/__tests__/lib/setupEnv.ts +++ b/tests/internal-tests/src/__tests__/lib/setupEnv.ts @@ -108,10 +108,11 @@ export async function setupExpectationManager( debugLogging: boolean, workerCount: number = 1, callbacks: ExpectationManagerCallbacks, - options?: ExpectationManagerOptions + options: ExpectationManagerOptions, + logFilterFunction: (level: string, ...args: any[]) => boolean ) { const logLevel = debugLogging ? LogLevel.DEBUG : LogLevel.WARN - const logger = setupLogger(config, '', undefined, undefined, logLevel) + const logger = setupLogger(config, '', undefined, undefined, logLevel, logFilterFunction) const expectationManager = new ExpectationManager( logger, @@ -205,6 +206,14 @@ export async function prepareTestEnviromnent(debugLogging: boolean): Promise boolean = () => { + return true // Default behavior: no filtering + } + let logFilterFunction = (level: string, ...args: any[]) => logFilterFunctionInner(level, ...args) + const setLogFilterFunction = (filter: (level: string, ...args: any[]) => boolean) => { + logFilterFunctionInner = filter + } + const em = await setupExpectationManager( config, debugLogging, @@ -290,11 +299,13 @@ export async function prepareTestEnviromnent(debugLogging: boolean): Promise true) em.expectationManager.resetWork() objectKeys(expectationStatuses).forEach((key: ExpectationId) => { delete expectationStatuses[key] @@ -325,10 +337,13 @@ export async function prepareTestEnviromnent(debugLogging: boolean): Promise void addWorker: () => Promise removeWorker: (id: WorkerAgentId) => Promise + setLogFilterFunction: (filter: (level: string, ...args: any[]) => boolean) => void } export type ExpectationStatuses = Record<