Skip to content

Commit

Permalink
feat: track failures in Workers in a time period (SOFIE-3355)
Browse files Browse the repository at this point in the history
  • Loading branch information
jstarpl committed Aug 14, 2024
1 parent c66b32f commit 732fa19
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 15 deletions.
6 changes: 3 additions & 3 deletions apps/worker/packages/generic/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ export async function startProcess(): Promise<void> {
const processHandler = new ProcessHandler(logger)
processHandler.init(config.process)

const workforce = new WorkerAgent(logger, config)
const workerAgent = new WorkerAgent(logger, config)

process.on('exit', (code) => {
logger.info(`Worker: Closing with exitCode: ${code}`)
workforce.terminate()
workerAgent.terminate()
})

workforce.init().catch(logger.error)
workerAgent.init().catch(logger.error)
}
9 changes: 9 additions & 0 deletions shared/packages/api/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ const workerArguments = defineArguments({
default: process.env.WORKER_PICK_UP_CRITICAL_EXPECTATIONS_ONLY === '1' || false,
describe: 'If set to 1, the worker will only pick up expectations that are marked as critical for playout.',
},
failureLimit: {
type: 'number',
default: process.env.WORKER_FAILURE_LIMIT || 0,
describe:
'If set, the worker will count the number of failures it encounters while working and will restart once this number is exceeded in a period of 60 seconds.',
},
})
/** CLI-argument-definitions for the AppContainer process */
const appContainerArguments = defineArguments({
Expand Down Expand Up @@ -426,6 +432,7 @@ export interface WorkerConfig {
costMultiplier: number
considerCPULoad: number | null
pickUpCriticalExpectationsOnly: boolean
failureLimit: number
} & WorkerAgentConfig
}
export async function getWorkerConfig(): Promise<WorkerConfig> {
Expand All @@ -452,6 +459,8 @@ export async function getWorkerConfig(): Promise<WorkerConfig> {
(typeof argv.considerCPULoad === 'string' ? parseFloat(argv.considerCPULoad) : argv.considerCPULoad) ||
null,
pickUpCriticalExpectationsOnly: argv.pickUpCriticalExpectationsOnly,
failureLimit:
(typeof argv.failureLimit === 'string' ? parseInt(argv.failureLimit) : argv.failureLimit) || 0,
},
}
}
Expand Down
67 changes: 55 additions & 12 deletions shared/packages/worker/src/workerAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ export class WorkerAgent {
private spinDownTime = 0
private intervalCheckTimer: NodeJS.Timeout | null = null
private lastWorkTime = 0
private failureCounter = 0
private intervalFailureTimer: NodeJS.Timeout | null = null
private activeMonitors: Map<PackageContainerId, Map<MonitorId, MonitorInProgress>> = new Map()
private initWorkForceAPIPromise?: { resolve: () => void; reject: (reason?: any) => void }
private initAppContainerAPIPromise?: { resolve: () => void; reject: (reason?: any) => void }
Expand Down Expand Up @@ -253,6 +255,8 @@ export class WorkerAgent {
// Wait for this.workforceAPI to be ready before continuing:
await pWorkForce

this.setupIntervalErrorCheck()

this.IDidSomeWork()
}
terminate(): void {
Expand All @@ -261,6 +265,8 @@ export class WorkerAgent {
this.terminated = true
this.workforceAPI.terminate()

if (this.intervalFailureTimer) clearInterval(this.intervalFailureTimer)

for (const expectationManager of this.expectationManagers.values()) {
expectationManager.api.terminate()
}
Expand Down Expand Up @@ -545,6 +551,7 @@ export class WorkerAgent {
this.IDidSomeWork()
if (job.cancelled) return // Don't send updates on cancelled work
job.lastUpdated = Date.now()
this.IFailed()
this.removeJob(job)
this.logger.warn(
`Worker "${this.id}" stopped job ${job.wipId}, (${exp.id}), due to error: (${
Expand Down Expand Up @@ -764,15 +771,15 @@ export class WorkerAgent {
// Wrap the methods, so that we can cut off communication upon termination: (this is used in tests)
for (const key of Object.keys(methods) as Array<keyof Omit<ExpectationManagerWorkerAgent.WorkerAgent, 'id'>>) {
const fcn = methods[key] as any
methods[key] = ((...args: any[]) => {
methods[key] = (async (...args: any[]) => {
if (this.terminated)
return new Promise((_resolve, reject) => {
// Simulate a timed out message:
setTimeout(() => {
reject('Timeout')
}, 200)
})
return fcn(...args)
return this.trackException(fcn(...args))
}) as any
}
// Connect to the ExpectationManager:
Expand All @@ -793,6 +800,14 @@ export class WorkerAgent {
await expectationManager.api.init(connectionOptions, methods)
}

private async trackException<ReturnType>(fnc: Promise<ReturnType>): Promise<ReturnType> {
fnc.catch((reason) => {
this.IFailed()
throw reason
})
return fnc
}

private async updateListOfExpectationManagers(newExpectationManagers: { id: ExpectationManagerId; url: string }[]) {
const ids = new Set<ExpectationManagerId>()
for (const newEm of newExpectationManagers) {
Expand Down Expand Up @@ -854,16 +869,7 @@ export class WorkerAgent {
if (!this.activeMonitors.size) {
this.logger.debug(`Worker: is idle, requesting spinning down`)

if (this.appContainerAPI.connected) {
this.appContainerAPI.requestSpinDown().catch((err) => {
this.logger.error(`Worker: appContainerAPI.requestSpinDown failed: ${stringifyError(err)}`)
})
} else {
// Huh, we're not connected to the appContainer.
// Well, we want to spin down anyway, so we'll do it:
// eslint-disable-next-line no-process-exit
process.exit(54)
}
this.requestShutDown()
}
}
}
Expand All @@ -875,13 +881,48 @@ export class WorkerAgent {
})
}
}
private requestShutDown() {
if (this.appContainerAPI.connected) {
this.appContainerAPI.requestSpinDown().catch((err) => {
this.logger.error(`Worker: appContainerAPI.requestSpinDown failed: ${stringifyError(err)}`)
})
} else {
// Huh, we're not connected to the appContainer.
// Well, we want to spin down anyway, so we'll do it:
// eslint-disable-next-line no-process-exit
process.exit(54)
}
}
private setupIntervalErrorCheck() {
if (this.config.worker.failureLimit <= 0) return
if (this.intervalFailureTimer) clearInterval(this.intervalFailureTimer)
this.intervalFailureTimer = setInterval(() => this.intervalErrorCheck(), FAILURE_CHECK_INTERVAL)
}
private intervalErrorCheck() {
if (this.config.worker.failureLimit >= 0 && this.failureCounter < this.config.worker.failureLimit) {
// reset the failureCounter when the interval elapses and it doesn't cross the threshold
this.failureCounter = 0
return
} else {
this.logger.error(
`Worker: Failed failureLimit check: ${this.failureCounter} errors in a ${FAILURE_CHECK_INTERVAL}ms window. Requesting spin down.`
)
this.requestShutDown()
}
}
/**
* To be called when some actual work has been done.
* If this is not called for a certain amount of time, the worker will be considered idle and will be spun down
*/
private IDidSomeWork() {
this.lastWorkTime = Date.now()
}
/**
* To be called when some work has failed
*/
private IFailed() {
this.failureCounter++
}
private getNextWipId(): WorkInProgressLocalId {
return protectString<WorkInProgressLocalId>(`${this._wipI++}`)
}
Expand All @@ -895,3 +936,5 @@ interface CurrentJob {
wipId: WorkInProgressLocalId
workInProgress: IWorkInProgress | null
}

const FAILURE_CHECK_INTERVAL = 60 * 1000

0 comments on commit 732fa19

Please sign in to comment.