Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/fix/worker-scaling' into test/1.…
Browse files Browse the repository at this point in the history
…51.0
  • Loading branch information
nytamin committed Feb 29, 2024
2 parents b13be06 + 81f354c commit 50f76b0
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 17 deletions.
4 changes: 3 additions & 1 deletion shared/packages/api/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ export function setupLogger(
})
if (initialLogLevel) setLogLevel(initialLogLevel, true)

logger.info('Logging to', logPath)
if (handleProcess) {
logger.info(`Logging to "${logPath}"`)
}
} else {
const transportConsole = new Winston.transports.Console({
level: logLevel,
Expand Down
3 changes: 3 additions & 0 deletions shared/packages/expectationManager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
"engines": {
"node": ">=14.18.0"
},
"devDependencies": {
"type-fest": "3.13.1"
},
"dependencies": {
"@sofie-package-manager/api": "1.50.0",
"@sofie-package-manager/worker": "1.50.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export async function evaluateExpectationStateReady({
}
} else {
// No worker is available at the moment.
// Check if anough time has passed if it makes sense to check for new workers again:
// Check if enough time has passed if it makes sense to check for new workers again:

if (
trackedExp.noWorkerAssignedTime &&
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import { PartialDeep } from 'type-fest'
import {
ExpectationId,
LogLevel,
ProcessConfig,
WorkerAgentId,
initializeLogger,
literal,
protectString,
setupLogger,
} from '@sofie-package-manager/api'
import { WorkerScaler } from '../workerScaler'
import { InternalManager } from '../../../internalManager/internalManager'
import { TrackedWorkerAgent } from '../../../internalManager/lib/trackedWorkerAgents'
import { ExpectationTracker } from '../../expectationTracker'
import { TrackedExpectation } from '../../../lib/trackedExpectation'
import { ExpectedPackageStatusAPI } from '@sofie-automation/shared-lib/dist/package-manager/package'

// ---------------------------------------------------------
const SCALE_UP_COUNT = 1
const SCALE_UP_TIME = 10
let isThereExistingWorkers = false
// ---------------------------------------------------------

const logLevel = LogLevel.WARN
const config = {
process: literal<ProcessConfig>({
logPath: undefined,
logLevel: undefined,
unsafeSSL: false,
certificates: [],
}),
}
initializeLogger(config)
const logger = setupLogger(config, '', undefined, undefined, logLevel)
logger.warn = jest.fn(logger.warn) as any
logger.error = jest.fn(logger.error) as any

const requestResourcesForExpectation = jest.fn(async () => false)

const fakeManager = literal<PartialDeep<InternalManager>>({
workforceConnection: {
workforceAPI: {
requestResourcesForExpectation,
},
},
workerAgents: {
list: (): { workerId: WorkerAgentId; workerAgent: TrackedWorkerAgent }[] => {
if (isThereExistingWorkers)
return [
{
workerId: protectString('worker0'),
workerAgent: {} as any as TrackedWorkerAgent,
},
]
else return []
},
},
}) as any as InternalManager

const fakeTracker = literal<PartialDeep<ExpectationTracker>>({
constants: {
SCALE_UP_COUNT,
SCALE_UP_TIME,
},
trackedExpectations: {
list: (): TrackedExpectation[] => {
return expectations
},
},
trackedExpectationAPI: {
isExpectationWaitingForOther: (_exp): TrackedExpectation | null => {
return null
},
},
getTrackedPackageContainers: () => {
return []
},
}) as any as ExpectationTracker
let expectations: TrackedExpectation[] = []
function setExpectations(
from: {
id: string
state: ExpectedPackageStatusAPI.WorkStatusState
hasAvailableWorkers: boolean
noWorkerAssignedTime?: number
}[]
) {
expectations = Array.from(from).map((e): TrackedExpectation => {
return literal<PartialDeep<TrackedExpectation>>({
id: protectString<ExpectationId>(e.id),
state: e.state,
noWorkerAssignedTime: e.noWorkerAssignedTime ?? null,
availableWorkers: new Set<WorkerAgentId>(e.hasAvailableWorkers ? [protectString('worker0')] : []),

exp: {
statusReport: {
label: `mock${e.id}`,
},
},
}) as any as TrackedExpectation
})
}

beforeEach(() => {
isThereExistingWorkers = false
expectations = []

requestResourcesForExpectation.mockClear()
})
afterEach(() => {
expect(logger.warn).toHaveBeenCalledTimes(0)
expect(logger.error).toHaveBeenCalledTimes(0)
})

test('no expectations', async () => {
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)
await scaler.checkIfNeedToScaleUp()

expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(0)
})
test('1 fulfilled expectation', async () => {
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)

isThereExistingWorkers = false
setExpectations([
{
id: 'exp0',
state: ExpectedPackageStatusAPI.WorkStatusState.FULFILLED,
hasAvailableWorkers: true,
},
])

await scaler.checkIfNeedToScaleUp()
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(0)
})

test('1 waiting expectation, no workers', async () => {
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)

isThereExistingWorkers = false
setExpectations([
{
id: 'exp0',
state: ExpectedPackageStatusAPI.WorkStatusState.WAITING,
hasAvailableWorkers: false,
},
])

await scaler.checkIfNeedToScaleUp()
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1)
})

test('1 waiting expectation', async () => {
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)

isThereExistingWorkers = true
setExpectations([
{
id: 'exp0',
state: ExpectedPackageStatusAPI.WorkStatusState.WAITING,
hasAvailableWorkers: true,
},
])

await scaler.checkIfNeedToScaleUp()
await sleep(SCALE_UP_TIME * 2)
await scaler.checkIfNeedToScaleUp()
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1)
})
test('1 expectation, not assigned to worker', async () => {
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)

isThereExistingWorkers = true
setExpectations([
{
id: 'exp0',
state: ExpectedPackageStatusAPI.WorkStatusState.FULFILLED,
noWorkerAssignedTime: Date.now() - 1000,
hasAvailableWorkers: true,
},
])

await scaler.checkIfNeedToScaleUp()
await sleep(SCALE_UP_TIME * 2)
await scaler.checkIfNeedToScaleUp()
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1)
})
test('1 expectation, no available workers', async () => {
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)

isThereExistingWorkers = true
setExpectations([
{
id: 'exp0',
state: ExpectedPackageStatusAPI.WorkStatusState.NEW,
// noWorkerAssignedTime: Date.now() - 1000,
hasAvailableWorkers: false,
},
])

await scaler.checkIfNeedToScaleUp()
await sleep(SCALE_UP_TIME * 2)
await scaler.checkIfNeedToScaleUp()
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1)
})

function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,8 @@ export class WorkInProgressTracker {
},
})

if (this.tracker.trackedExpectationAPI.onExpectationFulfilled(wip.trackedExp)) {
// Something was triggered, run again asap.
// We should reevaluate asap, so that any other expectation which might be waiting on this work could start.
this.tracker.triggerEvaluationNow()
}
// Trigger another evaluation ASAP, since a worker is free now, another expectation might be able to start:
this.tracker.triggerEvaluationNow()
} else {
// Expectation not in WORKING state, ignore
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,26 @@ export class WorkerScaler {
this.waitingExpectations = []

for (const exp of this.tracker.trackedExpectations.list()) {
/** The expectation is waiting on another expectation */
const isWaitingForOther = this.tracker.trackedExpectationAPI.isExpectationWaitingForOther(exp)

/** The expectation is waiting for a worker */
const isWaiting: boolean =
exp.state === ExpectedPackageStatusAPI.WorkStatusState.NEW ||
exp.state === ExpectedPackageStatusAPI.WorkStatusState.WAITING ||
exp.state === ExpectedPackageStatusAPI.WorkStatusState.READY

/** Not supported by any worker */
const notSupportedByAnyWorker: boolean = exp.availableWorkers.size === 0
const notSupportedByAnyWorker = exp.availableWorkers.size === 0

/** No worker has had time to work on it lately */
const notAssignedToAnyWorker: boolean =
const notAssignedToAnyWorkerForSomeTime: boolean =
!!exp.noWorkerAssignedTime &&
Date.now() - exp.noWorkerAssignedTime > this.tracker.constants.SCALE_UP_TIME

if (
isWaiting &&
(notSupportedByAnyWorker || notAssignedToAnyWorker) &&
!this.tracker.trackedExpectationAPI.isExpectationWaitingForOther(exp) // Filter out expectations that aren't ready to begin working on anyway
) {
// Is the expectation waiting for resources?
if (!isWaitingForOther && (isWaiting || notSupportedByAnyWorker || notAssignedToAnyWorkerForSomeTime)) {
// Add a second round of waiting, to ensure that we don't scale up prematurely:
if (!exp.waitingForWorkerTime) {
this.logger.silly(
`Starting to track how long expectation "${expLabel(exp)}" has been waiting for a worker`
Expand All @@ -84,6 +86,8 @@ export class WorkerScaler {
} else {
exp.waitingForWorkerTime = null
}

// If the expectation has been waiting for long enough:
if (exp.waitingForWorkerTime) {
const hasBeenWaitingFor = Date.now() - exp.waitingForWorkerTime
if (
Expand Down
10 changes: 9 additions & 1 deletion tests/internal-tests/src/__mocks__/child_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import path from 'path'

const fsCopyFile = promisify(fs.copyFile)
const fsMkdir = promisify(fs.mkdir)
const fsStat = promisify(fs.stat)

const child_process: any = jest.createMockFromModule('child_process')

Expand Down Expand Up @@ -173,7 +174,13 @@ async function robocopy(spawned: SpawnedProcess, args: string[]) {
const source = path.join(sourceFolder, file)
const destination = path.join(destinationFolder, file)

await fsMkdir(destinationFolder) // robocopy automatically creates the destination folder
try {
await fsStat(destinationFolder)
} catch (e) {
if (`${e}`.match(/ENOENT/)) {
await fsMkdir(destinationFolder) // robocopy automatically creates the destination folder
} else throw e
}

await fsCopyFile(source, destination)
}
Expand Down Expand Up @@ -270,4 +277,5 @@ async function netUse(commandString: string): Promise<{ stdout: string; stderr:
return { stdout, stderr }
}
}

module.exports = child_process
Loading

0 comments on commit 50f76b0

Please sign in to comment.