Skip to content

Commit

Permalink
Merge pull request #95 from nebulabroadcast/backend-rundown-enhancements
Browse files Browse the repository at this point in the history
Backend rundown enhancements
  • Loading branch information
martastain authored Nov 23, 2024
2 parents c94c0ed + d4db899 commit 9a6c6b3
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 54 deletions.
10 changes: 8 additions & 2 deletions backend/api/delete.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncpg
from fastapi import Response
from pydantic import Field

Expand Down Expand Up @@ -46,8 +47,13 @@ async def handle(
query = "DELETE FROM items WHERE id = ANY($1) RETURNING id, id_bin"
affected_bins = set()
nebula.log.debug(f"Deleted items: {request.ids}", user=user.name)
async for row in nebula.db.iterate(query, request.ids):
affected_bins.add(row["id_bin"])
try:
async for row in nebula.db.iterate(query, request.ids):
affected_bins.add(row["id_bin"])
except asyncpg.exceptions.ForeignKeyViolationError as e:
raise nebula.ConflictException(
"Cannot delete item because it was already aired"
) from e
await bin_refresh(list(affected_bins), initiator=initiator)
return Response(status_code=204)

Expand Down
96 changes: 67 additions & 29 deletions backend/api/rundown/get_rundown.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import time

import nebula
from nebula.enum import ObjectStatus, RunMode
from nebula.helpers.scheduling import (
get_item_runs,
get_pending_assets,
parse_rundown_date,
)
from nebula.helpers.scheduling import get_pending_assets, parse_rundown_date

from .models import RundownRequestModel, RundownResponseModel, RundownRow

Expand All @@ -14,9 +12,9 @@ async def get_rundown(request: RundownRequestModel) -> RundownResponseModel:
if not (channel := nebula.settings.get_playout_channel(request.id_channel)):
raise nebula.BadRequestException(f"No such channel: {request.id_channel}")

request_start_time = time.monotonic()
start_time = parse_rundown_date(request.date, channel)
end_time = start_time + (3600 * 24)
item_runs = await get_item_runs(request.id_channel, start_time, end_time)
pending_assets = await get_pending_assets(channel.send_action)
pskey = f"playout_status/{request.id_channel}"

Expand All @@ -27,19 +25,45 @@ async def get_rundown(request: RundownRequestModel) -> RundownResponseModel:
e.id_magic AS id_bin,
i.id AS id_item,
i.meta AS imeta,
a.meta AS ameta
FROM
events AS e
LEFT JOIN
items AS i
ON
e.id_magic = i.id_bin
LEFT JOIN
assets AS a
ON
i.id_asset = a.id
a.meta AS ameta,
ar.latest_start AS as_start,
ar.latest_stop AS as_stop
FROM events AS e
LEFT JOIN items AS i
ON e.id_magic = i.id_bin
LEFT JOIN assets AS a
ON i.id_asset = a.id
LEFT JOIN (
SELECT
id_item,
start AS latest_start,
stop AS latest_stop
FROM (
SELECT
id_item,
start,
stop,
ROW_NUMBER() OVER
(PARTITION BY id_item ORDER BY start DESC) AS rn
FROM asrun
WHERE
id_channel = $1
AND start >= $2 - 604800
AND start < $3 + 604800
) AS ranked
WHERE rn = 1
) AS ar
ON i.id = ar.id_item
WHERE
e.id_channel = $1 AND e.start >= $2 AND e.start < $3
e.id_channel = $1
AND e.start >= $2
AND e.start < $3
ORDER BY
e.start ASC,
i.position ASC,
Expand Down Expand Up @@ -102,10 +126,12 @@ async def get_rundown(request: RundownRequestModel) -> RundownResponseModel:
# TODO: append empty row?
continue

airstatus = 0
if (runs := item_runs.get(id_item)) is not None:
as_start, as_stop = runs
ts_broadcast = as_start
airstatus: ObjectStatus | None = None

if (as_start := record["as_start"]) is not None:
if as_start > ts_broadcast:
ts_broadcast = as_start
as_stop = record["as_stop"]
airstatus = ObjectStatus.AIRED if as_stop else ObjectStatus.ONAIR

# TODO
Expand All @@ -114,19 +140,27 @@ async def get_rundown(request: RundownRequestModel) -> RundownResponseModel:

# Row status

istatus = 0
istatus: ObjectStatus
if not ameta:
# virtual item. consider it online
istatus = ObjectStatus.ONLINE
elif airstatus:
istatus = airstatus
elif ameta.get("status") == ObjectStatus.OFFLINE:
# media is not on the production storage
istatus = ObjectStatus.OFFLINE
elif pskey not in ameta or ameta[pskey]["status"] == ObjectStatus.OFFLINE:
# media is not on the playout storage
istatus = ObjectStatus.REMOTE
elif ameta[pskey]["status"] == ObjectStatus.ONLINE:
istatus = ObjectStatus.ONLINE
elif ameta[pskey]["status"] == ObjectStatus.CORRUPTED:
# media is on the playout storage but corrupted
istatus = ObjectStatus.CORRUPTED
elif ameta[pskey]["status"] == ObjectStatus.ONLINE:
if airstatus is not None:
# media is on the playout storage and aired
istatus = airstatus
last_air = as_start
else:
# media is on the playout storage but not aired
istatus = ObjectStatus.ONLINE
else:
istatus = ObjectStatus.UNKNOWN

Expand Down Expand Up @@ -192,8 +226,12 @@ async def get_rundown(request: RundownRequestModel) -> RundownResponseModel:
if not last_event.duration:
last_event.broadcast_time = ts_broadcast
ts_scheduled += duration
ts_broadcast += duration
if row.item_role not in ["placeholder", "lead_in", "lead_out"]:
ts_broadcast += duration
last_event.duration += duration
last_event.is_empty = False

return RundownResponseModel(rows=rows)
elapsed = time.monotonic() - request_start_time
msg = f"Rundown generated in {elapsed:.3f} seconds"

return RundownResponseModel(rows=rows, detail=msg)
1 change: 1 addition & 0 deletions backend/api/rundown/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ class RundownRow(ResponseModel):

class RundownResponseModel(ResponseModel):
rows: list[RundownRow] = Field(default_factory=list)
detail: str | None = Field(None)
50 changes: 29 additions & 21 deletions backend/api/scheduler/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncpg
from nxtools import format_time

import nebula
Expand Down Expand Up @@ -45,8 +46,12 @@ async def delete_events(ids: list[int], user: nebula.User | None = None) -> list
"DELETE FROM items WHERE id_bin = $1",
id_bin,
)
except asyncpg.exceptions.ForeignKeyViolationError as e:
raise nebula.ConflictException(
"Cannot delete event containing aired items"
) from e
except Exception:
nebula.log.error(f"Failed to delete items of {event}")
nebula.log.traceback(f"Failed to delete items of {event}")
continue

await nebula.db.execute("DELETE FROM bins WHERE id = $1", id_bin)
Expand Down Expand Up @@ -75,35 +80,38 @@ async def get_events_in_range(
f"from {format_time(int(start_time))} to {format_time(int(end_time))}",
user=username,
)

result = []

# Last event before start_time
# Events between start_time and end_time
# and the last event before end_time
async for row in nebula.db.iterate(
"""
SELECT e.meta as emeta, o.meta as ometa FROM events AS e, bins AS o
WHERE
e.id_channel=$1
(
SELECT
e.meta AS emeta,
o.meta AS ometa,
e.start
FROM events AS e, bins AS o
WHERE
e.id_channel = $1
AND e.start < $2
AND e.id_magic = o.id
ORDER BY start DESC LIMIT 1
""",
id_channel,
start_time,
):
rec = row["emeta"]
rec["duration"] = row["ometa"].get("duration")
result.append(nebula.Event.from_meta(rec))

# Events between start_time and end_time
async for row in nebula.db.iterate(
"""
SELECT e.meta as emeta, o.meta as ometa FROM events AS e, bins AS o
WHERE
e.id_channel=$1
ORDER BY e.start DESC
LIMIT 1
)
UNION ALL
(
SELECT
e.meta AS emeta,
o.meta AS ometa,
e.start
FROM events AS e, bins AS o
WHERE
e.id_channel = $1
AND e.start >= $2
AND e.start < $3
AND e.id_magic = o.id
)
ORDER BY start ASC
""",
id_channel,
Expand Down
17 changes: 15 additions & 2 deletions backend/api/solve.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from fastapi import Response
from pydantic import Field

import nebula
from nebula.exceptions import BadRequestException
from nebula.plugins.library import plugin_library
from server.dependencies import CurrentUser
Expand Down Expand Up @@ -29,8 +30,20 @@ async def handle(
request: SolveRequestModel,
user: CurrentUser,
) -> Response:
# TODO: check permissions
assert user is not None
# Get the list of channels of the requested items

query = """
SELECT DISTINCT id_channel
FROM events e
INNER JOIN items i
ON e.id_magic = i.id_bin
WHERE i.id = ANY($1)
"""

err_msg = "You are not allowed to edit this rundown"
async for row in nebula.db.iterate(query, request.items):
if not user.can("rundown_edit", row["id_channel"]):
raise nebula.ForbiddenException(err_msg)

try:
solver = plugin_library.get("solver", request.solver)
Expand Down

0 comments on commit 9a6c6b3

Please sign in to comment.