Skip to content

Commit

Permalink
Merge pull request #197 from nrkno/feat/worker-track-errors/sofie-3355
Browse files Browse the repository at this point in the history
feat: track failures in Workers in a time period (SOFIE-3355)
  • Loading branch information
jstarpl authored Aug 28, 2024
2 parents e576a66 + de0f43b commit 9ac2762
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 22 deletions.
27 changes: 23 additions & 4 deletions apps/appcontainer-node/packages/generic/src/appContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,26 @@ export class AppContainer {
const app = this.apps.get(clientId)
if (app) app.lastPing = Date.now()
},
requestSpinDown: async (): Promise<void> => {
requestSpinDown: async (force?: boolean): Promise<void> => {
const app = this.apps.get(clientId)
if (!app || !app.isAutoScaling) return
if (this.getAutoScalingAppCount(app.appType) > this.config.appContainer.minRunningApps) {
this.spinDown(clientId, `Requested by app`).catch((error) => {
if (!app) return

if (force) {
// The Worker is forcefully asking to be spun down.
this.spinDown(clientId, `Forced by app`).catch((error) => {
this.logger.error(`Error when spinning down app "${clientId}": ${stringifyError(error)}`)
})
// Note: this.monitorApps() will soon spin up another Worker if needed
} else {
// The Worker is kindly asking to be spun down.
// The appcontainer will determine if it should be spun down.

if (!app.isAutoScaling) return
if (this.getAutoScalingAppCount(app.appType) > this.config.appContainer.minRunningApps) {
this.spinDown(clientId, `Requested by app`).catch((error) => {
this.logger.error(`Error when spinning down app "${clientId}": ${stringifyError(error)}`)
})
}
}
},
workerStorageWriteLock: async (
Expand Down Expand Up @@ -298,6 +311,12 @@ export class AppContainer {
this.config.appContainer.worker.networkIds.length
? `--networkIds=${this.config.appContainer.worker.networkIds.join(';')}`
: '',
this.config.appContainer.worker.failurePeriodLimit
? `--failurePeriodLimit=${this.config.appContainer.worker.failurePeriodLimit}`
: '',
this.config.appContainer.worker.failurePeriod
? `--failurePeriod=${this.config.appContainer.worker.failurePeriod}`
: '',
]
}
if (
Expand Down
6 changes: 3 additions & 3 deletions apps/worker/packages/generic/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ export async function startProcess(): Promise<void> {
const processHandler = new ProcessHandler(logger)
processHandler.init(config.process)

const workforce = new WorkerAgent(logger, config)
const workerAgent = new WorkerAgent(logger, config)

process.on('exit', (code) => {
logger.info(`Worker: Closing with exitCode: ${code}`)
workforce.terminate()
workerAgent.terminate()
})

workforce.init().catch(logger.error)
workerAgent.init().catch(logger.error)
}
2 changes: 2 additions & 0 deletions shared/packages/api/src/appContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export interface AppContainerConfig {
windowsDriveLetters: WorkerAgentConfig['windowsDriveLetters']
costMultiplier: number
considerCPULoad: number | null
failurePeriodLimit: number
failurePeriod: number
}
}

Expand Down
36 changes: 36 additions & 0 deletions shared/packages/api/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,17 @@ const workerArguments = defineArguments({
default: process.env.WORKER_PICK_UP_CRITICAL_EXPECTATIONS_ONLY === '1' || false,
describe: 'If set to 1, the worker will only pick up expectations that are marked as critical for playout.',
},
failurePeriodLimit: {
type: 'number',
default: parseInt(process.env.WORKER_FAILURE_PERIOD_LIMIT || '', 10) || 0,
describe:
'If set, the worker will count the number of periods of time where it encounters errors while working and will restart once the number of consequent periods of time is exceeded.',
},
failurePeriod: {
type: 'number',
default: parseInt(process.env.WORKER_FAILURE_PERIOD || '', 10) || 5 * 60 * 1000,
describe: 'This is the period of time used by "failurePeriodLimit" (milliseconds)',
},
})
/** CLI-argument-definitions for the AppContainer process */
const appContainerArguments = defineArguments({
Expand Down Expand Up @@ -240,6 +251,17 @@ const appContainerArguments = defineArguments({
describe:
'If set, the worker will consider the CPU load of the system it runs on before it accepts jobs. Set to a value between 0 and 1, the worker will accept jobs if the CPU load is below the configured value.',
},
failurePeriodLimit: {
type: 'number',
default: parseInt(process.env.WORKER_FAILURE_PERIOD_LIMIT || '', 10) || 0,
describe:
'If set, the worker will count the number of periods of time where it encounters errors while working and will restart once the number of consequent periods of time is exceeded.',
},
failurePeriod: {
type: 'number',
default: parseInt(process.env.WORKER_FAILURE_PERIOD || '', 10) || 5 * 60 * 1000,
describe: 'This is the period of time used by "failurePeriodLimit" (milliseconds)',
},
})
/** CLI-argument-definitions for the "Single" process */
const singleAppArguments = defineArguments({
Expand Down Expand Up @@ -426,6 +448,8 @@ export interface WorkerConfig {
costMultiplier: number
considerCPULoad: number | null
pickUpCriticalExpectationsOnly: boolean
failurePeriodLimit: number
failurePeriod: number
} & WorkerAgentConfig
}
export async function getWorkerConfig(): Promise<WorkerConfig> {
Expand All @@ -452,6 +476,12 @@ export async function getWorkerConfig(): Promise<WorkerConfig> {
(typeof argv.considerCPULoad === 'string' ? parseFloat(argv.considerCPULoad) : argv.considerCPULoad) ||
null,
pickUpCriticalExpectationsOnly: argv.pickUpCriticalExpectationsOnly,
failurePeriodLimit:
(typeof argv.failurePeriodLimit === 'string'
? parseInt(argv.failurePeriodLimit)
: argv.failurePeriodLimit) || 0,
failurePeriod:
(typeof argv.failurePeriod === 'string' ? parseInt(argv.failurePeriod) : argv.failurePeriod) || 0,
},
}
}
Expand Down Expand Up @@ -491,6 +521,12 @@ export async function getAppContainerConfig(): Promise<AppContainerProcessConfig
(typeof argv.considerCPULoad === 'string'
? parseFloat(argv.considerCPULoad)
: argv.considerCPULoad) || null,
failurePeriodLimit:
(typeof argv.failurePeriodLimit === 'string'
? parseInt(argv.failurePeriodLimit)
: argv.failurePeriodLimit) || 0,
failurePeriod:
(typeof argv.failurePeriod === 'string' ? parseInt(argv.failurePeriod) : argv.failurePeriod) || 0,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion shared/packages/api/src/methods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ export namespace AppContainerWorkerAgent {
id: WorkerAgentId

ping: () => Promise<void>
requestSpinDown: () => Promise<void>
requestSpinDown: (force?: boolean) => Promise<void>
/** Acquire a write lock, the returned id is then used in workerStorageWrite to write */
workerStorageWriteLock: (dataId: DataId, customTimeout?: number) => Promise<{ lockId: LockId; current: any }>
workerStorageReleaseLock: (dataId: DataId, lockId: LockId) => Promise<void>
Expand Down
4 changes: 2 additions & 2 deletions shared/packages/worker/src/appContainerApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ export class AppContainerAPI
async ping(): Promise<void> {
return this._sendMessage('ping')
}
async requestSpinDown(): Promise<void> {
return this._sendMessage('requestSpinDown')
async requestSpinDown(force?: boolean): Promise<void> {
return this._sendMessage('requestSpinDown', force)
}
async workerStorageWriteLock(
dataId: DataId,
Expand Down
73 changes: 61 additions & 12 deletions shared/packages/worker/src/workerAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ export class WorkerAgent {
private spinDownTime = 0
private intervalCheckTimer: NodeJS.Timeout | null = null
private lastWorkTime = 0
private failureCounter = 0
private failurePeriodCounter = 0
private intervalFailureTimer: NodeJS.Timeout | null = null
private activeMonitors: Map<PackageContainerId, Map<MonitorId, MonitorInProgress>> = new Map()
private initWorkForceAPIPromise?: { resolve: () => void; reject: (reason?: any) => void }
private initAppContainerAPIPromise?: { resolve: () => void; reject: (reason?: any) => void }
Expand Down Expand Up @@ -253,6 +256,8 @@ export class WorkerAgent {
// Wait for this.workforceAPI to be ready before continuing:
await pWorkForce

this.setupIntervalErrorCheck()

this.IDidSomeWork()
}
terminate(): void {
Expand All @@ -261,6 +266,8 @@ export class WorkerAgent {
this.terminated = true
this.workforceAPI.terminate()

if (this.intervalFailureTimer) clearInterval(this.intervalFailureTimer)

for (const expectationManager of this.expectationManagers.values()) {
expectationManager.api.terminate()
}
Expand Down Expand Up @@ -545,6 +552,7 @@ export class WorkerAgent {
this.IDidSomeWork()
if (job.cancelled) return // Don't send updates on cancelled work
job.lastUpdated = Date.now()
this.IFailed()
this.removeJob(job)
this.logger.warn(
`Worker "${this.id}" stopped job ${job.wipId}, (${exp.id}), due to error: (${
Expand Down Expand Up @@ -764,15 +772,15 @@ export class WorkerAgent {
// Wrap the methods, so that we can cut off communication upon termination: (this is used in tests)
for (const key of Object.keys(methods) as Array<keyof Omit<ExpectationManagerWorkerAgent.WorkerAgent, 'id'>>) {
const fcn = methods[key] as any
methods[key] = ((...args: any[]) => {
methods[key] = (async (...args: any[]) => {
if (this.terminated)
return new Promise((_resolve, reject) => {
// Simulate a timed out message:
setTimeout(() => {
reject('Timeout')
}, 200)
})
return fcn(...args)
return this.trackException(fcn(...args))
}) as any
}
// Connect to the ExpectationManager:
Expand All @@ -793,6 +801,13 @@ export class WorkerAgent {
await expectationManager.api.init(connectionOptions, methods)
}

private async trackException<ReturnType>(fnc: Promise<ReturnType>): Promise<ReturnType> {
fnc.catch(() => {
this.IFailed()
})
return fnc
}

private async updateListOfExpectationManagers(newExpectationManagers: { id: ExpectationManagerId; url: string }[]) {
const ids = new Set<ExpectationManagerId>()
for (const newEm of newExpectationManagers) {
Expand Down Expand Up @@ -854,16 +869,7 @@ export class WorkerAgent {
if (!this.activeMonitors.size) {
this.logger.debug(`Worker: is idle, requesting spinning down`)

if (this.appContainerAPI.connected) {
this.appContainerAPI.requestSpinDown().catch((err) => {
this.logger.error(`Worker: appContainerAPI.requestSpinDown failed: ${stringifyError(err)}`)
})
} else {
// Huh, we're not connected to the appContainer.
// Well, we want to spin down anyway, so we'll do it:
// eslint-disable-next-line no-process-exit
process.exit(54)
}
this.requestShutDown()
}
}
}
Expand All @@ -875,13 +881,56 @@ export class WorkerAgent {
})
}
}
private requestShutDown(force?: boolean) {
if (this.appContainerAPI.connected) {
this.appContainerAPI.requestSpinDown(force).catch((err) => {
this.logger.error(`Worker: appContainerAPI.requestSpinDown failed: ${stringifyError(err)}`)
})
} else {
// Huh, we're not connected to the appContainer.
// Well, we want to spin down anyway, so we'll do it:
// eslint-disable-next-line no-process-exit
process.exit(54)
}
}
private setupIntervalErrorCheck() {
if (this.config.worker.failurePeriodLimit <= 0) return
if (this.intervalFailureTimer) clearInterval(this.intervalFailureTimer)
this.intervalFailureTimer = setInterval(() => this.intervalErrorCheck(), this.config.worker.failurePeriod)
}
private intervalErrorCheck() {
if (this.failureCounter === 0) {
// reset the failurePeriodCounter when there were no exceptions in the period
this.failurePeriodCounter = 0
// everything seems fine
return
}

if (this.failureCounter > 0) {
this.failurePeriodCounter++
this.failureCounter = 0
}

if (this.failurePeriodCounter >= this.config.worker.failurePeriodLimit) {
this.logger.error(
`Worker: Failed failurePeriodLimit check: ${this.failurePeriodCounter} periods with errors. Requesting spin down.`
)
this.requestShutDown(true)
}
}
/**
* To be called when some actual work has been done.
* If this is not called for a certain amount of time, the worker will be considered idle and will be spun down
*/
private IDidSomeWork() {
this.lastWorkTime = Date.now()
}
/**
* To be called when some work has failed
*/
private IFailed() {
this.failureCounter++
}
private getNextWipId(): WorkInProgressLocalId {
return protectString<WorkInProgressLocalId>(`${this._wipI++}`)
}
Expand Down
4 changes: 4 additions & 0 deletions tests/internal-tests/src/__tests__/lib/setupEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ const defaultTestConfig: SingleAppConfig = {
costMultiplier: 1,
considerCPULoad: null,
pickUpCriticalExpectationsOnly: false,
failurePeriod: 0,
failurePeriodLimit: 0,
},
quantelHTTPTransformerProxy: {
port: 0,
Expand All @@ -103,6 +105,8 @@ const defaultTestConfig: SingleAppConfig = {
windowsDriveLetters: ['X', 'Y', 'Z'],
costMultiplier: 1,
considerCPULoad: null,
failurePeriod: 0,
failurePeriodLimit: 0,
},
},
}
Expand Down

0 comments on commit 9ac2762

Please sign in to comment.