Skip to content

Commit

Permalink
Merge pull request #49 from nebulabroadcast/refactor/general_clean_up
Browse files Browse the repository at this point in the history
General clean up
  • Loading branch information
martastain authored Feb 3, 2024
2 parents 08a42d0 + 6f2bfae commit 8b03909
Show file tree
Hide file tree
Showing 23 changed files with 284 additions and 198 deletions.
7 changes: 5 additions & 2 deletions backend/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ async def handle(self, authorization: str | None = Header(None)):
class SetPassword(APIRequest):
"""Set a new password for the current (or a given) user.
In order to set a password for another user, the current user must be an admin.
Normal users can only change their own password.
In order to set a password for another user,
the current user must be an admin, otherwise a 403 error is returned.
"""

name: str = "password"
Expand All @@ -163,7 +166,7 @@ async def handle(
) -> Response:
if request.login:
if not user.is_admin:
raise nebula.UnauthorizedException(
raise nebula.ForbiddenException(
"Only admin can change other user's password"
)
query = "SELECT meta FROM users WHERE login = $1"
Expand Down
7 changes: 3 additions & 4 deletions backend/api/jobs/jobs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import time
from typing import Literal

from fastapi import Response
from nxtools import slugify
from pydantic import Field

Expand Down Expand Up @@ -95,7 +94,7 @@ class JobsItemModel(RequestModel):


class JobsResponseModel(ResponseModel):
jobs: list[JobsItemModel] = Field(default_factory=list)
jobs: list[JobsItemModel] | None = Field(default=None)


async def can_user_control_job(user: nebula.User, id_job: int) -> bool:
Expand Down Expand Up @@ -192,7 +191,7 @@ async def handle(
self,
request: JobsRequestModel,
user: CurrentUser,
) -> JobsResponseModel | Response:
) -> JobsResponseModel:
if request.abort:
await abort_job(request.abort, user)

Expand Down Expand Up @@ -227,7 +226,7 @@ async def handle(
# failed
conds.append("j.status IN (3)")
elif request.view is None:
return Response(status_code=204)
return JobsResponseModel()

if request.asset_ids is not None:
ids = ",".join([str(id) for id in request.asset_ids])
Expand Down
65 changes: 28 additions & 37 deletions backend/api/proxy.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,26 @@
import os

import aiofiles
from fastapi import Header, HTTPException, Request, Response, status
from fastapi.responses import StreamingResponse
from fastapi import HTTPException, Request, Response, status

import nebula
from server.dependencies import CurrentUserInQuery
from server.dependencies import CurrentUser
from server.request import APIRequest


async def send_bytes_range_requests(file_name: str, start: int, end: int):
"""Send a file in chunks using Range Requests specification RFC7233
class ProxyResponse(Response):
content_type = "video/mp4"

`start` and `end` parameters are inclusive due to specification
"""
CHUNK_SIZE = 1024 * 8

sent_bytes = 0
try:
async with aiofiles.open(file_name, mode="rb") as f:
await f.seek(start)
pos = start
while pos < end:
read_size = min(CHUNK_SIZE, end - pos + 1)
data = await f.read(read_size)
yield data
pos += len(data)
sent_bytes += len(data)
finally:
nebula.log.trace(
f"Finished sending file {start}-{end}. Sent {sent_bytes} bytes. Expected {end-start+1} bytes"
)
async def get_bytes_range(file_name: str, start: int, end: int) -> bytes:
"""Get a range of bytes from a file"""

async with aiofiles.open(file_name, mode="rb") as f:
await f.seek(start)
pos = start
# read_size = min(CHUNK_SIZE, end - pos + 1)
read_size = end - pos + 1
return await f.read(read_size)


def _get_range_header(range_header: str, file_size: int) -> tuple[int, int]:
Expand All @@ -52,10 +42,13 @@ def _invalid_range():
return start, end


async def range_requests_response(request: Request, file_path: str, content_type: str):
async def range_requests_response(
request: Request, file_path: str, content_type: str
) -> ProxyResponse:
"""Returns StreamingResponse using Range Requests of a given file"""

file_size = os.stat(file_path).st_size
max_chunk_size = 1024 * 1024 # 2MB
range_header = request.headers.get("range")

headers = {
Expand All @@ -74,22 +67,21 @@ async def range_requests_response(request: Request, file_path: str, content_type

if range_header is not None:
start, end = _get_range_header(range_header, file_size)
end = min(end, start + max_chunk_size - 1)
size = end - start + 1
headers["content-length"] = str(size)
headers["content-range"] = f"bytes {start}-{end}/{file_size}"
status_code = status.HTTP_206_PARTIAL_CONTENT

return StreamingResponse(
send_bytes_range_requests(file_path, start, end),
payload = await get_bytes_range(file_path, start, end)

return ProxyResponse(
content=payload,
headers=headers,
status_code=status_code,
)


class ProxyResponse(Response):
content_type = "video/mp4"


class ServeProxy(APIRequest):
"""Serve a low-res (proxy) media for a given asset.
Expand All @@ -100,16 +92,15 @@ class ServeProxy(APIRequest):
name: str = "proxy"
path: str = "/proxy/{id_asset}"
title: str = "Serve proxy"
response_class = ProxyResponse
methods = ["GET"]

async def handle(
self,
request: Request,
id_asset: int,
user: CurrentUserInQuery,
range: str = Header(None),
):
user: CurrentUser,
) -> ProxyResponse:
assert user
sys_settings = nebula.settings.system
proxy_storage_path = nebula.storages[sys_settings.proxy_storage].local_path
proxy_path_template = os.path.join(proxy_storage_path, sys_settings.proxy_path)
Expand All @@ -123,10 +114,10 @@ async def handle(

if not os.path.exists(video_path):
# maybe return content too? with a placeholder image?
return Response(status_code=404, content="Not found")
raise nebula.NotFoundException("Proxy not found")

try:
return await range_requests_response(request, video_path, "video/mp4")
except Exception:
except Exception as e:
nebula.log.traceback("Error serving proxy")
return Response(status_code=500, content="Internal server error")
raise nebula.NebulaException("Error serving proxy") from e
6 changes: 2 additions & 4 deletions backend/api/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class Sessions(APIRequest):

name = "sessions"
title = "List sessions"
response_model = list[SessionModel]

async def handle(
self,
Expand Down Expand Up @@ -55,15 +54,14 @@ class InvalidateSession(APIRequest):
"""

name = "invalidate_session"
title = "Invalidate a session"
responses = [204, 201]
title = "Invalidate session"
responses = [204]

async def handle(
self,
payload: InvalidateSessionRequest,
user: CurrentUser,
) -> Response:

session = await Session.check(payload.token)
if session is None:
raise nebula.NotFoundException("Session not found")
Expand Down
46 changes: 36 additions & 10 deletions backend/manage
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/bin/bash

SERVER_TYPE=${NEBULA_SERVER_TYPE:-gunicorn}

if [ $# -ne 1 ]; then
echo "Error: a single argument is required"
exit 1
Expand All @@ -12,34 +14,58 @@ function start_server () {

# Run setup to make sure database is up to date
python -m setup


# Start gunicorn
exec gunicorn \
-k uvicorn.workers.UvicornWorker \
--log-level warning \
-b :80 \
server.server:app
if [ $SERVER_TYPE = "gunicorn" ]; then
exec gunicorn \
-k uvicorn.workers.UvicornWorker \
--log-level warning \
-b :80 \
server.server:app
elif [ $SERVER_TYPE = "granian" ]; then
exec granian \
--interface asgi \
--log-level warning \
--host 0.0.0.0 \
--port 80 \
server.server:app
else
echo ""
echo "Error: invalid server type '$SERVER_TYPE'. Expected 'gunicorn' or 'granian'"
echo ""
exit 1
fi
}


function get_server_pid () {
if [ $SERVER_TYPE = "gunicorn" ]; then
pid=$(ps aux | grep 'gunicorn' | awk '{print $2}')
elif [ $SERVER_TYPE = "granian" ]; then
pid=$(ps aux | grep 'granian' | awk '{print $2}')
fi
echo $pid
}


function stop_server () {
echo ""
echo "SIGTERM signal received. Shutting down..."
echo ""
pid=$(ps aux | grep 'gunicorn' | awk '{print $2}')
kill -TERM $pid 2> /dev/null
kill -TERM $(get_server_pid) 2> /dev/null
exit 0
}

function reload_server () {
echo ""
echo "Reloading the server..."
echo ""
pid=$(ps aux | grep 'gunicorn' | awk '{print $2}')
kill -HUP $pid 2> /dev/null
kill -HUP $(get_server_pid) 2> /dev/null
exit 0
}

trap stop_server SIGTERM SIGINT
trap reload_server SIGHUP


if [ $1 = "start" ]; then
Expand Down
2 changes: 1 addition & 1 deletion backend/mypy.ini
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[mypy]
python_version = 3.10
ignore_missing_imports = false
check_untyped_defs = false
check_untyped_defs = true
strict=false
files=./**/*.py
exclude=(tests/|venv/)
Expand Down
36 changes: 31 additions & 5 deletions backend/nebula/enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,42 @@


class ObjectStatus(enum.IntEnum):
"""Object status enumeration.
This enumeration is used to indicate the status of an object.
Objects can be in one of the following states:
- OFFLINE: Object is in the database, but not available on the filesystem.
- ONLINE: Object is in the database and available on the filesystem.
- CREATING: Media file exists, but was changed recently, so its metadata
is being updated.
- TRASHED: Object has been marked as deleted, but is still available on
the filesystem. It will be deleted permanently after some time.
- ARCHIVED: Object has been marked as archived, but is still available on
the filesystem. It will be deleted permanently after some time.
- RESET: User has requested to reset the metadata of the object,
this triggers a re-scan of the media file metadata.
- CORRUPTED: Object is corrupted, and cannot be used.
- REMOTE: Object is not available on the filesystem, but is available one
a remote storage (typically a playout item which media file is on a
production storage, but it hasn't been copied to the playout storage yet).
- UNKNOWN: Object status is unknown.
- AIRED: Only for items. Item has been broadcasted.
- ONAIR: Only for items. Item is currently being broadcasted.
- RETRIEVING: Asset is marked for retrieval from a remote/archive storage.
"""

OFFLINE = 0
ONLINE = 1
CREATING = 2 # File exists, but was changed recently.
TRASHED = 3 # File has been moved to trash location.
ARCHIVED = 4 # File has been moved to archive location.
RESET = 5 # Reset metadata action has been invoked.
CREATING = 2
TRASHED = 3
ARCHIVED = 4
RESET = 5
CORRUPTED = 6
REMOTE = 7
UNKNOWN = 8
AIRED = 9 # Auxiliary value.
AIRED = 9
ONAIR = 10
RETRIEVING = 11

Expand Down
20 changes: 20 additions & 0 deletions backend/nebula/log.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import enum
import logging
import sys
import traceback

Expand Down Expand Up @@ -70,3 +71,22 @@ def critical(self, *args, **kwargs):


log = Logger()

# Add custom logging handler to standard logging module
# This allows us to use the standard logging module with
# the same format, log level and consumers as the primary
# Nebula logger. This is useful for 3rd party libraries.


class CustomHandler(logging.Handler):
def emit(self, record):
log_message = self.format(record)
name = record.name
log(LogLevel(record.levelno // 10), log_message, user=name)


root_logger = logging.getLogger()
root_logger.setLevel(log.level * 10)

custom_handler = CustomHandler()
root_logger.addHandler(custom_handler)
3 changes: 3 additions & 0 deletions backend/nebula/objects/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ class Event(BaseObject):
}

async def delete_children(self):
assert self.connection is not None
assert hasattr(self.connection, "execute")
assert self.id
await self.connection.execute("DELETE FROM bins WHERE id_magic = $1", self.id)
4 changes: 2 additions & 2 deletions backend/nebula/plugins/solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,14 @@ async def main(self):
item["position"] = i
await item.save(notify=False)

if self.bin.id not in self.affected_bins:
if self.bin.id and self.bin.id not in self.affected_bins:
self.affected_bins.append(self.bin.id)

# save event in case solver updated its metadata
await self.event.save()

# another paceholder was created, so we need to solve it
if self._solve_next:
if self._solve_next and self._solve_next.id:
await self(self._solve_next.id)
return

Expand Down
Loading

0 comments on commit 8b03909

Please sign in to comment.