Skip to content

Commit

Permalink
fix: fix an issue where workers wheren't scaled up properly when expe…
Browse files Browse the repository at this point in the history
…ctations where waiting
  • Loading branch information
nytamin committed Feb 21, 2024
1 parent c60399c commit 81f354c
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 12 deletions.
3 changes: 3 additions & 0 deletions shared/packages/expectationManager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
"engines": {
"node": ">=14.18.0"
},
"devDependencies": {
"type-fest": "3.13.1"
},
"dependencies": {
"@sofie-package-manager/api": "1.50.0",
"@sofie-package-manager/worker": "1.50.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import { PartialDeep } from 'type-fest'
import {
ExpectationId,
LogLevel,
ProcessConfig,
WorkerAgentId,
initializeLogger,
literal,
protectString,
setupLogger,
} from '@sofie-package-manager/api'
import { WorkerScaler } from '../workerScaler'
import { InternalManager } from '../../../internalManager/internalManager'
import { TrackedWorkerAgent } from '../../../internalManager/lib/trackedWorkerAgents'
import { ExpectationTracker } from '../../expectationTracker'
import { TrackedExpectation } from '../../../lib/trackedExpectation'
import { ExpectedPackageStatusAPI } from '@sofie-automation/shared-lib/dist/package-manager/package'

// ---------------------------------------------------------
const SCALE_UP_COUNT = 1
const SCALE_UP_TIME = 10
let isThereExistingWorkers = false
// ---------------------------------------------------------

const logLevel = LogLevel.WARN
const config = {
process: literal<ProcessConfig>({
logPath: undefined,
logLevel: undefined,
unsafeSSL: false,
certificates: [],
}),
}
initializeLogger(config)
const logger = setupLogger(config, '', undefined, undefined, logLevel)
logger.warn = jest.fn(logger.warn) as any
logger.error = jest.fn(logger.error) as any

const requestResourcesForExpectation = jest.fn(async () => false)

const fakeManager = literal<PartialDeep<InternalManager>>({
workforceConnection: {
workforceAPI: {
requestResourcesForExpectation,
},
},
workerAgents: {
list: (): { workerId: WorkerAgentId; workerAgent: TrackedWorkerAgent }[] => {
if (isThereExistingWorkers)
return [
{
workerId: protectString('worker0'),
workerAgent: {} as any as TrackedWorkerAgent,
},
]
else return []
},
},
}) as any as InternalManager

const fakeTracker = literal<PartialDeep<ExpectationTracker>>({
constants: {
SCALE_UP_COUNT,
SCALE_UP_TIME,
},
trackedExpectations: {
list: (): TrackedExpectation[] => {
return expectations
},
},
trackedExpectationAPI: {
isExpectationWaitingForOther: (_exp): TrackedExpectation | null => {
return null
},
},
getTrackedPackageContainers: () => {
return []
},
}) as any as ExpectationTracker
let expectations: TrackedExpectation[] = []
function setExpectations(
from: {
id: string
state: ExpectedPackageStatusAPI.WorkStatusState
hasAvailableWorkers: boolean
noWorkerAssignedTime?: number
}[]
) {
expectations = Array.from(from).map((e): TrackedExpectation => {
return literal<PartialDeep<TrackedExpectation>>({
id: protectString<ExpectationId>(e.id),
state: e.state,
noWorkerAssignedTime: e.noWorkerAssignedTime ?? null,
availableWorkers: new Set<WorkerAgentId>(e.hasAvailableWorkers ? [protectString('worker0')] : []),

exp: {
statusReport: {
label: `mock${e.id}`,
},
},
}) as any as TrackedExpectation
})
}

beforeEach(() => {
isThereExistingWorkers = false
expectations = []

requestResourcesForExpectation.mockClear()
})
afterEach(() => {
expect(logger.warn).toHaveBeenCalledTimes(0)
expect(logger.error).toHaveBeenCalledTimes(0)
})

test('no expectations', async () => {
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)
await scaler.checkIfNeedToScaleUp()

expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(0)
})
test('1 fulfilled expectation', async () => {
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)

isThereExistingWorkers = false
setExpectations([
{
id: 'exp0',
state: ExpectedPackageStatusAPI.WorkStatusState.FULFILLED,
hasAvailableWorkers: true,
},
])

await scaler.checkIfNeedToScaleUp()
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(0)
})

test('1 waiting expectation, no workers', async () => {
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)

isThereExistingWorkers = false
setExpectations([
{
id: 'exp0',
state: ExpectedPackageStatusAPI.WorkStatusState.WAITING,
hasAvailableWorkers: false,
},
])

await scaler.checkIfNeedToScaleUp()
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1)
})

test('1 waiting expectation', async () => {
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)

isThereExistingWorkers = true
setExpectations([
{
id: 'exp0',
state: ExpectedPackageStatusAPI.WorkStatusState.WAITING,
hasAvailableWorkers: true,
},
])

await scaler.checkIfNeedToScaleUp()
await sleep(SCALE_UP_TIME * 2)
await scaler.checkIfNeedToScaleUp()
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1)
})
test('1 expectation, not assigned to worker', async () => {
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)

isThereExistingWorkers = true
setExpectations([
{
id: 'exp0',
state: ExpectedPackageStatusAPI.WorkStatusState.FULFILLED,
noWorkerAssignedTime: Date.now() - 1000,
hasAvailableWorkers: true,
},
])

await scaler.checkIfNeedToScaleUp()
await sleep(SCALE_UP_TIME * 2)
await scaler.checkIfNeedToScaleUp()
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1)
})
test('1 expectation, no available workers', async () => {
const scaler = new WorkerScaler(logger, fakeManager, fakeTracker)

isThereExistingWorkers = true
setExpectations([
{
id: 'exp0',
state: ExpectedPackageStatusAPI.WorkStatusState.NEW,
// noWorkerAssignedTime: Date.now() - 1000,
hasAvailableWorkers: false,
},
])

await scaler.checkIfNeedToScaleUp()
await sleep(SCALE_UP_TIME * 2)
await scaler.checkIfNeedToScaleUp()
expect(fakeManager.workforceConnection.workforceAPI.requestResourcesForExpectation).toHaveBeenCalledTimes(1)
})

function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,8 @@ export class WorkInProgressTracker {
},
})

if (this.tracker.trackedExpectationAPI.onExpectationFulfilled(wip.trackedExp)) {
// Something was triggered, run again asap.
// We should reevaluate asap, so that any other expectation which might be waiting on this work could start.
this.tracker.triggerEvaluationNow()
}
// Trigger another evaluation ASAP, since a worker is free now, another expectation might be able to start:
this.tracker.triggerEvaluationNow()
} else {
// Expectation not in WORKING state, ignore
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,26 @@ export class WorkerScaler {
this.waitingExpectations = []

for (const exp of this.tracker.trackedExpectations.list()) {
/** The expectation is waiting on another expectation */
const isWaitingForOther = this.tracker.trackedExpectationAPI.isExpectationWaitingForOther(exp)

/** The expectation is waiting for a worker */
const isWaiting: boolean =
exp.state === ExpectedPackageStatusAPI.WorkStatusState.NEW ||
exp.state === ExpectedPackageStatusAPI.WorkStatusState.WAITING ||
exp.state === ExpectedPackageStatusAPI.WorkStatusState.READY

/** Not supported by any worker */
const notSupportedByAnyWorker: boolean = exp.availableWorkers.size === 0
const notSupportedByAnyWorker = exp.availableWorkers.size === 0

/** No worker has had time to work on it lately */
const notAssignedToAnyWorker: boolean =
const notAssignedToAnyWorkerForSomeTime: boolean =
!!exp.noWorkerAssignedTime &&
Date.now() - exp.noWorkerAssignedTime > this.tracker.constants.SCALE_UP_TIME

if (
isWaiting &&
(notSupportedByAnyWorker || notAssignedToAnyWorker) &&
!this.tracker.trackedExpectationAPI.isExpectationWaitingForOther(exp) // Filter out expectations that aren't ready to begin working on anyway
) {
// Is the expectation waiting for resources?
if (!isWaitingForOther && (isWaiting || notSupportedByAnyWorker || notAssignedToAnyWorkerForSomeTime)) {
// Add a second round of waiting, to ensure that we don't scale up prematurely:
if (!exp.waitingForWorkerTime) {
this.logger.silly(
`Starting to track how long expectation "${expLabel(exp)}" has been waiting for a worker`
Expand All @@ -84,6 +86,8 @@ export class WorkerScaler {
} else {
exp.waitingForWorkerTime = null
}

// If the expectation has been waiting for long enough:
if (exp.waitingForWorkerTime) {
const hasBeenWaitingFor = Date.now() - exp.waitingForWorkerTime
if (
Expand Down

0 comments on commit 81f354c

Please sign in to comment.