diff --git a/apps/appcontainer-node/packages/generic/src/appContainer.ts b/apps/appcontainer-node/packages/generic/src/appContainer.ts index 4021d705..05e22d7c 100644 --- a/apps/appcontainer-node/packages/generic/src/appContainer.ts +++ b/apps/appcontainer-node/packages/generic/src/appContainer.ts @@ -223,13 +223,26 @@ export class AppContainer { const app = this.apps.get(clientId) if (app) app.lastPing = Date.now() }, - requestSpinDown: async (): Promise => { + requestSpinDown: async (force?: boolean): Promise => { const app = this.apps.get(clientId) - if (!app || !app.isAutoScaling) return - if (this.getAutoScalingAppCount(app.appType) > this.config.appContainer.minRunningApps) { - this.spinDown(clientId, `Requested by app`).catch((error) => { + if (!app) return + + if (force) { + // The Worker is forcefully asking to be spun down. + this.spinDown(clientId, `Forced by app`).catch((error) => { this.logger.error(`Error when spinning down app "${clientId}": ${stringifyError(error)}`) }) + // Note: this.monitorApps() will soon spin up another Worker if needed + } else { + // The Worker is kindly asking to be spun down. + // The appcontainer will determine if it should be spun down. + + if (!app.isAutoScaling) return + if (this.getAutoScalingAppCount(app.appType) > this.config.appContainer.minRunningApps) { + this.spinDown(clientId, `Requested by app`).catch((error) => { + this.logger.error(`Error when spinning down app "${clientId}": ${stringifyError(error)}`) + }) + } } }, workerStorageWriteLock: async ( @@ -298,6 +311,12 @@ export class AppContainer { this.config.appContainer.worker.networkIds.length ? `--networkIds=${this.config.appContainer.worker.networkIds.join(';')}` : '', + this.config.appContainer.worker.failurePeriodLimit + ? `--failurePeriodLimit=${this.config.appContainer.worker.failurePeriodLimit}` + : '', + this.config.appContainer.worker.failurePeriod + ? `--failurePeriod=${this.config.appContainer.worker.failurePeriod}` + : '', ] } if ( diff --git a/apps/worker/packages/generic/src/index.ts b/apps/worker/packages/generic/src/index.ts index 9052fe2d..ea70da71 100644 --- a/apps/worker/packages/generic/src/index.ts +++ b/apps/worker/packages/generic/src/index.ts @@ -14,12 +14,12 @@ export async function startProcess(): Promise { const processHandler = new ProcessHandler(logger) processHandler.init(config.process) - const workforce = new WorkerAgent(logger, config) + const workerAgent = new WorkerAgent(logger, config) process.on('exit', (code) => { logger.info(`Worker: Closing with exitCode: ${code}`) - workforce.terminate() + workerAgent.terminate() }) - workforce.init().catch(logger.error) + workerAgent.init().catch(logger.error) } diff --git a/shared/packages/api/src/appContainer.ts b/shared/packages/api/src/appContainer.ts index 1a5e3f84..9eae871a 100644 --- a/shared/packages/api/src/appContainer.ts +++ b/shared/packages/api/src/appContainer.ts @@ -24,6 +24,8 @@ export interface AppContainerConfig { windowsDriveLetters: WorkerAgentConfig['windowsDriveLetters'] costMultiplier: number considerCPULoad: number | null + failurePeriodLimit: number + failurePeriod: number } } diff --git a/shared/packages/api/src/config.ts b/shared/packages/api/src/config.ts index 7e214e7d..f686711d 100644 --- a/shared/packages/api/src/config.ts +++ b/shared/packages/api/src/config.ts @@ -169,6 +169,17 @@ const workerArguments = defineArguments({ default: process.env.WORKER_PICK_UP_CRITICAL_EXPECTATIONS_ONLY === '1' || false, describe: 'If set to 1, the worker will only pick up expectations that are marked as critical for playout.', }, + failurePeriodLimit: { + type: 'number', + default: parseInt(process.env.WORKER_FAILURE_PERIOD_LIMIT || '', 10) || 0, + describe: + 'If set, the worker will count the number of periods of time where it encounters errors while working and will restart once the number of consequent periods of time is exceeded.', + }, + failurePeriod: { + type: 'number', + default: parseInt(process.env.WORKER_FAILURE_PERIOD || '', 10) || 5 * 60 * 1000, + describe: 'This is the period of time used by "failurePeriodLimit" (milliseconds)', + }, }) /** CLI-argument-definitions for the AppContainer process */ const appContainerArguments = defineArguments({ @@ -240,6 +251,17 @@ const appContainerArguments = defineArguments({ describe: 'If set, the worker will consider the CPU load of the system it runs on before it accepts jobs. Set to a value between 0 and 1, the worker will accept jobs if the CPU load is below the configured value.', }, + failurePeriodLimit: { + type: 'number', + default: parseInt(process.env.WORKER_FAILURE_PERIOD_LIMIT || '', 10) || 0, + describe: + 'If set, the worker will count the number of periods of time where it encounters errors while working and will restart once the number of consequent periods of time is exceeded.', + }, + failurePeriod: { + type: 'number', + default: parseInt(process.env.WORKER_FAILURE_PERIOD || '', 10) || 5 * 60 * 1000, + describe: 'This is the period of time used by "failurePeriodLimit" (milliseconds)', + }, }) /** CLI-argument-definitions for the "Single" process */ const singleAppArguments = defineArguments({ @@ -426,6 +448,8 @@ export interface WorkerConfig { costMultiplier: number considerCPULoad: number | null pickUpCriticalExpectationsOnly: boolean + failurePeriodLimit: number + failurePeriod: number } & WorkerAgentConfig } export async function getWorkerConfig(): Promise { @@ -452,6 +476,12 @@ export async function getWorkerConfig(): Promise { (typeof argv.considerCPULoad === 'string' ? parseFloat(argv.considerCPULoad) : argv.considerCPULoad) || null, pickUpCriticalExpectationsOnly: argv.pickUpCriticalExpectationsOnly, + failurePeriodLimit: + (typeof argv.failurePeriodLimit === 'string' + ? parseInt(argv.failurePeriodLimit) + : argv.failurePeriodLimit) || 0, + failurePeriod: + (typeof argv.failurePeriod === 'string' ? parseInt(argv.failurePeriod) : argv.failurePeriod) || 0, }, } } @@ -491,6 +521,12 @@ export async function getAppContainerConfig(): Promise Promise - requestSpinDown: () => Promise + requestSpinDown: (force?: boolean) => Promise /** Acquire a write lock, the returned id is then used in workerStorageWrite to write */ workerStorageWriteLock: (dataId: DataId, customTimeout?: number) => Promise<{ lockId: LockId; current: any }> workerStorageReleaseLock: (dataId: DataId, lockId: LockId) => Promise diff --git a/shared/packages/worker/src/appContainerApi.ts b/shared/packages/worker/src/appContainerApi.ts index e12dd780..9bdc73e0 100644 --- a/shared/packages/worker/src/appContainerApi.ts +++ b/shared/packages/worker/src/appContainerApi.ts @@ -23,8 +23,8 @@ export class AppContainerAPI async ping(): Promise { return this._sendMessage('ping') } - async requestSpinDown(): Promise { - return this._sendMessage('requestSpinDown') + async requestSpinDown(force?: boolean): Promise { + return this._sendMessage('requestSpinDown', force) } async workerStorageWriteLock( dataId: DataId, diff --git a/shared/packages/worker/src/workerAgent.ts b/shared/packages/worker/src/workerAgent.ts index faf0d254..ad45904f 100644 --- a/shared/packages/worker/src/workerAgent.ts +++ b/shared/packages/worker/src/workerAgent.ts @@ -79,6 +79,9 @@ export class WorkerAgent { private spinDownTime = 0 private intervalCheckTimer: NodeJS.Timeout | null = null private lastWorkTime = 0 + private failureCounter = 0 + private failurePeriodCounter = 0 + private intervalFailureTimer: NodeJS.Timeout | null = null private activeMonitors: Map> = new Map() private initWorkForceAPIPromise?: { resolve: () => void; reject: (reason?: any) => void } private initAppContainerAPIPromise?: { resolve: () => void; reject: (reason?: any) => void } @@ -253,6 +256,8 @@ export class WorkerAgent { // Wait for this.workforceAPI to be ready before continuing: await pWorkForce + this.setupIntervalErrorCheck() + this.IDidSomeWork() } terminate(): void { @@ -261,6 +266,8 @@ export class WorkerAgent { this.terminated = true this.workforceAPI.terminate() + if (this.intervalFailureTimer) clearInterval(this.intervalFailureTimer) + for (const expectationManager of this.expectationManagers.values()) { expectationManager.api.terminate() } @@ -545,6 +552,7 @@ export class WorkerAgent { this.IDidSomeWork() if (job.cancelled) return // Don't send updates on cancelled work job.lastUpdated = Date.now() + this.IFailed() this.removeJob(job) this.logger.warn( `Worker "${this.id}" stopped job ${job.wipId}, (${exp.id}), due to error: (${ @@ -764,7 +772,7 @@ export class WorkerAgent { // Wrap the methods, so that we can cut off communication upon termination: (this is used in tests) for (const key of Object.keys(methods) as Array>) { const fcn = methods[key] as any - methods[key] = ((...args: any[]) => { + methods[key] = (async (...args: any[]) => { if (this.terminated) return new Promise((_resolve, reject) => { // Simulate a timed out message: @@ -772,7 +780,7 @@ export class WorkerAgent { reject('Timeout') }, 200) }) - return fcn(...args) + return this.trackException(fcn(...args)) }) as any } // Connect to the ExpectationManager: @@ -793,6 +801,13 @@ export class WorkerAgent { await expectationManager.api.init(connectionOptions, methods) } + private async trackException(fnc: Promise): Promise { + fnc.catch(() => { + this.IFailed() + }) + return fnc + } + private async updateListOfExpectationManagers(newExpectationManagers: { id: ExpectationManagerId; url: string }[]) { const ids = new Set() for (const newEm of newExpectationManagers) { @@ -854,16 +869,7 @@ export class WorkerAgent { if (!this.activeMonitors.size) { this.logger.debug(`Worker: is idle, requesting spinning down`) - if (this.appContainerAPI.connected) { - this.appContainerAPI.requestSpinDown().catch((err) => { - this.logger.error(`Worker: appContainerAPI.requestSpinDown failed: ${stringifyError(err)}`) - }) - } else { - // Huh, we're not connected to the appContainer. - // Well, we want to spin down anyway, so we'll do it: - // eslint-disable-next-line no-process-exit - process.exit(54) - } + this.requestShutDown() } } } @@ -875,6 +881,43 @@ export class WorkerAgent { }) } } + private requestShutDown(force?: boolean) { + if (this.appContainerAPI.connected) { + this.appContainerAPI.requestSpinDown(force).catch((err) => { + this.logger.error(`Worker: appContainerAPI.requestSpinDown failed: ${stringifyError(err)}`) + }) + } else { + // Huh, we're not connected to the appContainer. + // Well, we want to spin down anyway, so we'll do it: + // eslint-disable-next-line no-process-exit + process.exit(54) + } + } + private setupIntervalErrorCheck() { + if (this.config.worker.failurePeriodLimit <= 0) return + if (this.intervalFailureTimer) clearInterval(this.intervalFailureTimer) + this.intervalFailureTimer = setInterval(() => this.intervalErrorCheck(), this.config.worker.failurePeriod) + } + private intervalErrorCheck() { + if (this.failureCounter === 0) { + // reset the failurePeriodCounter when there were no exceptions in the period + this.failurePeriodCounter = 0 + // everything seems fine + return + } + + if (this.failureCounter > 0) { + this.failurePeriodCounter++ + this.failureCounter = 0 + } + + if (this.failurePeriodCounter >= this.config.worker.failurePeriodLimit) { + this.logger.error( + `Worker: Failed failurePeriodLimit check: ${this.failurePeriodCounter} periods with errors. Requesting spin down.` + ) + this.requestShutDown(true) + } + } /** * To be called when some actual work has been done. * If this is not called for a certain amount of time, the worker will be considered idle and will be spun down @@ -882,6 +925,12 @@ export class WorkerAgent { private IDidSomeWork() { this.lastWorkTime = Date.now() } + /** + * To be called when some work has failed + */ + private IFailed() { + this.failureCounter++ + } private getNextWipId(): WorkInProgressLocalId { return protectString(`${this._wipI++}`) } diff --git a/tests/internal-tests/src/__tests__/lib/setupEnv.ts b/tests/internal-tests/src/__tests__/lib/setupEnv.ts index 35f9185e..c95129f2 100644 --- a/tests/internal-tests/src/__tests__/lib/setupEnv.ts +++ b/tests/internal-tests/src/__tests__/lib/setupEnv.ts @@ -83,6 +83,8 @@ const defaultTestConfig: SingleAppConfig = { costMultiplier: 1, considerCPULoad: null, pickUpCriticalExpectationsOnly: false, + failurePeriod: 0, + failurePeriodLimit: 0, }, quantelHTTPTransformerProxy: { port: 0, @@ -103,6 +105,8 @@ const defaultTestConfig: SingleAppConfig = { windowsDriveLetters: ['X', 'Y', 'Z'], costMultiplier: 1, considerCPULoad: null, + failurePeriod: 0, + failurePeriodLimit: 0, }, }, }