Skip to content

Commit

Permalink
Merge pull request #90 from tv2/develop
Browse files Browse the repository at this point in the history
Merge develop to staging
  • Loading branch information
temk-tv2 authored Nov 22, 2023
2 parents 8fbaea6 + 6821631 commit a5a22f8
Show file tree
Hide file tree
Showing 14 changed files with 2,058 additions and 2,344 deletions.
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,16 @@
"@sofie-automation/shared-lib": "npm:@tv2media/[email protected]",
"@tv2media/logger": "^1.2.4",
"@types/dotenv": "^6.1.1",
"@types/koa": "^2.13.10",
"@types/koa-router": "^7.4.6",
"@types/xml2js": "^0.4.4",
"async-mutex": "^0.3.1",
"cross-env": "^7.0.3",
"dotenv": "^8.0.0",
"inews": "npm:@tv2media/[email protected]",
"jobs-queue": "git+https://github.com/sparkpunkd/node-jobs-queue.git#master",
"koa": "^2.14.2",
"koa-router": "^12.0.1",
"lodash.clonedeep": "^4.5.0",
"tslib": "^2.0.0",
"underscore": "^1.9.1",
Expand Down
21 changes: 11 additions & 10 deletions src/classes/RundownManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import { ReducedRundown, ReducedSegment, UnrankedSegment } from './RundownWatche
import { literal, parseModifiedDateFromInewsStoryWithFallbackToNow, ReflectPromise } from '../helpers'
import { VERSION } from '../version'
import { SegmentId } from '../helpers/id'
import { ILogger as Logger } from '@tv2media/logger'
import { ILogger } from '@tv2media/logger'
import { CoreHandler } from '../coreHandler'
import { StatusCode } from '@sofie-automation/shared-lib/dist/lib/status'

function isFile(f: INewsDirItem): f is INewsFile {
return f.filetype === 'file'
Expand All @@ -15,11 +17,9 @@ export class RundownManager {
private _listStories!: (queueName: string) => Promise<Array<INewsDirItem>>
private _getStory!: (queueName: string, story: string) => Promise<INewsStory>

constructor(private _logger?: Logger, private inewsConnection?: INewsClient) {
if (this.inewsConnection) {
this._listStories = promisify(this.inewsConnection.list).bind(this.inewsConnection)
this._getStory = promisify(this.inewsConnection.story).bind(this.inewsConnection)
}
constructor(private _logger: ILogger, private inewsConnection: INewsClient, private coreHandler: CoreHandler) {
this._listStories = promisify(this.inewsConnection.list).bind(this.inewsConnection)
this._getStory = promisify(this.inewsConnection.story).bind(this.inewsConnection)
}

/**
Expand Down Expand Up @@ -57,7 +57,8 @@ export class RundownManager {
}
})
} catch (error) {
this._logger?.data(error).error('Error downloading iNews rundown:')
this._logger.data(error).error('Error downloading iNews rundown:')
await this.coreHandler.setStatus(StatusCode.FATAL, ['Error downloading iNews rundown', (error as Error).message])
}
return rundown
}
Expand Down Expand Up @@ -110,15 +111,15 @@ export class RundownManager {
locator: (storyFile as INewsFile).locator,
}
} catch (err) {
this._logger?.error(`Error downloading iNews story: ${err}`)
this._logger.error(`Error downloading iNews story: ${err}`)
return undefined
}

this._logger?.debug('Downloaded : ' + queueName + ' : ' + (storyFile as INewsFile).identifier)
this._logger.debug('Downloaded : ' + queueName + ' : ' + (storyFile as INewsFile).identifier)
/* Add fileId and update modifyDate to ftp reference in storyFile */
story.fields.modifyDate = `${storyFile.modified ? storyFile.modified.getTime() / 1000 : 0}`

this._logger?.debug(`Queue: ${queueName} Story: ${isFile(storyFile) ? storyFile.storyName : storyFile.file}`)
this._logger.debug(`Queue: ${queueName} Story: ${isFile(storyFile) ? storyFile.storyName : storyFile.file}`)

return story
}
Expand Down
185 changes: 68 additions & 117 deletions src/classes/RundownWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { StatusCode } from '@sofie-automation/shared-lib/dist/lib/status'
import { CoreHandler } from '../coreHandler'
import { SegmentRankings, SegmentRankingsInner } from './ParsedINewsToSegments'
import { IngestPlaylist, IngestRundown, IngestSegment } from '@sofie-automation/blueprints-integration'
import { ResolvedPlaylist, ResolveRundownIntoPlaylist } from '../helpers/ResolveRundownIntoPlaylist'
import { ResolvedPlaylistRundown, ResolveRundownIntoPlaylist } from '../helpers/ResolveRundownIntoPlaylist'
import { DiffPlaylist } from '../helpers/DiffPlaylist'
import { PlaylistId, RundownId, SegmentId } from '../helpers/id'
import { Mutex } from 'async-mutex'
Expand Down Expand Up @@ -122,10 +122,9 @@ export type ReducedRundown = Pick<INewsRundown, 'externalId' | 'name' | 'gateway
export type ReducedSegment = Pick<ISegment, 'externalId' | 'modified' | 'rank' | 'name' | 'locator'>
export type UnrankedSegment = Omit<ISegment, 'rank' | 'float' | 'untimed'>

export type PlaylistMap = Map<PlaylistId, { externalId: string; rundowns: RundownId[] }>
export type RundownMap = Map<RundownId, ReducedRundown>

export type PlaylistCache = Map<PlaylistId, RundownId[]>
export type PlaylistCache = Map<PlaylistId, RundownId>
export type RundownCache = Map<RundownId, SegmentId[]>
export type SegmentCache = Map<SegmentId, ReducedSegment>

Expand Down Expand Up @@ -176,15 +175,15 @@ export class RundownWatcher extends EventEmitter {
private lastForcedRankRecalculation: Map<RundownId, number> = new Map()

private cachedINewsData: Map<SegmentId, UnrankedSegment> = new Map()
private cachedPlaylistAssignments: Map<PlaylistId, ResolvedPlaylist> = new Map()
private cachedAssignedRundowns: Map<PlaylistId, Array<INewsRundown>> = new Map()
private cachedPlaylistAssignments: Map<PlaylistId, ResolvedPlaylistRundown> = new Map()
private cachedAssignedRundowns: Map<PlaylistId, INewsRundown> = new Map()
private skipCacheForRundown: Set<RundownId> = new Set()

public playlists: PlaylistCache = new Map()
public rundowns: RundownCache = new Map()
public segments: SegmentCache = new Map()

private processingRundown: Mutex = new Mutex()
private processingRundownMutex: Mutex = new Mutex()

/**
* A Rundown watcher which will poll iNews FTP server for changes and emit events
Expand All @@ -206,7 +205,7 @@ export class RundownWatcher extends EventEmitter {
super()
this._logger = this.logger.tag(this.constructor.name)

this.rundownManager = new RundownManager(this._logger, this.iNewsConnection)
this.rundownManager = new RundownManager(this._logger, this.iNewsConnection, this.coreHandler)

if (!delayStart) {
this.startWatcher()
Expand Down Expand Up @@ -277,15 +276,14 @@ export class RundownWatcher extends EventEmitter {
}

public async ResyncRundown(rundownExternalId: string) {
const release = await this.processingRundown.acquire()
const release = await this.processingRundownMutex.acquire()
const playlistExternalId = rundownExternalId.replace(/_\d+$/, '')
const playlist = this.playlists.get(playlistExternalId)
const rundown = this.rundowns.get(rundownExternalId)

if (!playlist || !rundown) {
this.logger.error(`Rundown ${rundownExternalId} does not exist in playlist ${playlistExternalId}`)
release()
return
throw new Error(`Rundown ${rundownExternalId} does not exist in playlist ${playlistExternalId}`)
}

// Delete cached data for this rundown
Expand All @@ -294,25 +292,11 @@ export class RundownWatcher extends EventEmitter {
this.cachedINewsData.delete(segmentId)
}
this.rundowns.delete(rundownExternalId)
this.playlists.set(
playlistExternalId,
playlist.filter((r) => r !== rundownExternalId)
)
this.playlists.delete(playlistExternalId)

this.cachedPlaylistAssignments.delete(playlistExternalId)
this.cachedAssignedRundowns.delete(playlistExternalId)

const cachedPlaylist = this.cachedPlaylistAssignments.get(playlistExternalId)
if (cachedPlaylist) {
this.cachedPlaylistAssignments.set(
playlistExternalId,
cachedPlaylist.filter((p) => p.rundownId !== rundownExternalId)
)
}
const cachedAssignedRundown = this.cachedAssignedRundowns.get(playlistExternalId)
if (cachedAssignedRundown) {
this.cachedAssignedRundowns.set(
playlistExternalId,
cachedAssignedRundown.filter((p) => p.externalId !== rundownExternalId)
)
}
this.lastForcedRankRecalculation.delete(rundownExternalId)
this.skipCacheForRundown.add(rundownExternalId)
release()
Expand All @@ -327,7 +311,7 @@ export class RundownWatcher extends EventEmitter {
async checkINewsRundownById(rundownId: string): Promise<ReducedRundown> {
const rundown = await this.rundownManager.downloadRundown(rundownId)
if (rundown.gatewayVersion === this.gatewayVersion) {
const release = await this.processingRundown.acquire()
const release = await this.processingRundownMutex.acquire()
try {
await this.processUpdatedRundown(rundown.externalId, rundown)
} catch (e) {
Expand All @@ -349,13 +333,6 @@ export class RundownWatcher extends EventEmitter {
const cachedPlaylist = this.playlists.get(playlistId)

if (cachedPlaylist) {
const cachedRundowns: Array<{ externalId: RundownId; segmentIds: SegmentId[] }> = []
for (const rundownId of cachedPlaylist) {
let cachedRundown = this.rundowns.get(rundownId)
if (!cachedRundown) continue
cachedRundowns.push({ externalId: rundownId, segmentIds: cachedRundown })
}

// Fetch any segments that may have changed
for (const segment of playlist.segments) {
const cachedSegment = this.segments.get(segment.externalId)
Expand Down Expand Up @@ -393,131 +370,105 @@ export class RundownWatcher extends EventEmitter {
`Could not find iNews data for segment ${s.externalId} in rundown ${playlistId}. Segment will appear out of order.`
)
} else {
if (cachedData.iNewsStory.meta.float) {
return
}
segmentsToResolve.push(cachedData)
}
})

const { resolvedPlaylist: playlistAssignments, untimedSegments } = ResolveRundownIntoPlaylist(
playlistId,
segmentsToResolve
)
if (!playlistAssignments.length) {
playlistAssignments.push({
rundownId: `${playlistId}_1`,
segments: [],
})
}
const { resolvedRundown, untimedSegments } = ResolveRundownIntoPlaylist(playlistId, segmentsToResolve)

// Fetch ingestDataCache for segments that have been modified
const ingestDataPromises: Array<Promise<Map<SegmentId, RundownSegment>>> = []

for (const rundown of playlistAssignments) {
if (this.skipCacheForRundown.has(rundown.rundownId)) {
this.skipCacheForRundown.delete(rundown.rundownId)
continue
}
let ingestDataPromise: Promise<Map<SegmentId, RundownSegment>> | undefined = undefined

if (this.skipCacheForRundown.has(resolvedRundown.rundownId)) {
this.skipCacheForRundown.delete(resolvedRundown.rundownId)
} else {
const segmentsToFetch: SegmentId[] = []
for (const segmentId of rundown.segments) {
for (const segmentId of resolvedRundown.segments) {
if (uncachedINewsData.has(segmentId)) {
segmentsToFetch.push(segmentId)
}
}

ingestDataPromises.push(this.coreHandler.GetSegmentsCacheById(rundown.rundownId, segmentsToFetch))
ingestDataPromise = this.coreHandler.GetSegmentsCacheById(resolvedRundown.rundownId, segmentsToFetch)
}

const ingestCacheList = await Promise.all(ingestDataPromises)

const ingestCacheData: Map<SegmentId, RundownSegment> = new Map()

for (let cache of ingestCacheList) {
for (let [segmentId, data] of cache) {
ingestCacheData.set(segmentId, data)
}
}
const ingestCacheData: Map<SegmentId, RundownSegment> = (await ingestDataPromise) ?? new Map()

const assignedRundowns: INewsRundown[] = []

for (const playlistRundown of playlistAssignments) {
const rundownSegments: RundownSegment[] = []

for (const segmentId of playlistRundown.segments) {
const iNewsData = this.cachedINewsData.get(segmentId)
const rundownSegments: RundownSegment[] = []

if (!iNewsData) {
this.logger.error(
`Failed to assign segment ${segmentId} to rundown ${playlistRundown.rundownId}. Could not find cached iNews data`
)
continue
}
for (const segmentId of resolvedRundown.segments) {
const iNewsData = this.cachedINewsData.get(segmentId)

const rundownSegment = new RundownSegment(
playlistRundown.rundownId,
iNewsData.iNewsStory,
iNewsData.modified,
iNewsData.locator,
segmentId,
0,
iNewsData?.name,
untimedSegments.has(segmentId)
if (!iNewsData) {
this.logger.error(
`Failed to assign segment ${segmentId} to rundown ${resolvedRundown.rundownId}. Could not find cached iNews data`
)
rundownSegments.push(rundownSegment)
continue
}

const iNewsRundown: INewsRundown = new INewsRundown(
playlistRundown.rundownId,
playlistRundown.rundownId,
this.gatewayVersion,
rundownSegments,
playlistRundown.payload
const rundownSegment = new RundownSegment(
resolvedRundown.rundownId,
iNewsData.iNewsStory,
iNewsData.modified,
iNewsData.locator,
segmentId,
0,
iNewsData?.name,
untimedSegments.has(segmentId)
)

assignedRundowns.push(iNewsRundown)
rundownSegments.push(rundownSegment)
}

const { changes, segmentChanges } = DiffPlaylist(
assignedRundowns,
this.cachedAssignedRundowns.get(playlistId) ?? []
const iNewsRundown: INewsRundown = new INewsRundown(
resolvedRundown.rundownId,
resolvedRundown.rundownId,
this.gatewayVersion,
rundownSegments,
resolvedRundown.payload
)

assignedRundowns.push(iNewsRundown)

const { playlistChanges: changes, segmentChanges } = DiffPlaylist(
iNewsRundown,
this.cachedAssignedRundowns.get(playlistId)
)

this.cachedPlaylistAssignments.set(playlistId, playlistAssignments)
this.cachedAssignedRundowns.set(playlistId, assignedRundowns)
this.cachedPlaylistAssignments.set(playlistId, resolvedRundown)
this.cachedAssignedRundowns.set(playlistId, iNewsRundown)

let segmentRanks = AssignRanksToSegments(
playlistAssignments,
let { assignedRanks, recalculatedAsIntegers } = AssignRanksToSegments(
resolvedRundown,
changes,
segmentChanges,
this.previousRanks,
this.lastForcedRankRecalculation
)
const assignedRanks: Map<SegmentId, number> = new Map()
for (const rundown of segmentRanks) {
if (rundown.recalculatedAsIntegers) {
this.lastForcedRankRecalculation.set(rundown.rundownId, Date.now())
}
if (recalculatedAsIntegers) {
this.lastForcedRankRecalculation.set(resolvedRundown.rundownId, Date.now())
}

for (const [segmentId, rank] of rundown.assignedRanks) {
assignedRanks.set(segmentId, rank)
}
this.updatePreviousRanks(rundown.rundownId, rundown.assignedRanks)
for (const [segmentId, rank] of assignedRanks) {
assignedRanks.set(segmentId, rank)
}
this.updatePreviousRanks(resolvedRundown.rundownId, assignedRanks)

for (const segment of playlist.segments) {
this.segments.set(segment.externalId, segment)
}
for (const rundown of playlistAssignments) {
this.rundowns.set(rundown.rundownId, rundown.segments)
}
this.playlists.set(
playlistId,
playlistAssignments.map((r) => r.rundownId)
)
this.rundowns.set(resolvedRundown.rundownId, resolvedRundown.segments)

this.playlists.set(playlistId, resolvedRundown.rundownId)

const coreCalls = GenerateCoreCalls(
playlistId,
changes,
playlistAssignments,
resolvedRundown,
assignedRanks,
this.cachedINewsData,
ingestCacheData,
Expand Down
Loading

0 comments on commit a5a22f8

Please sign in to comment.