-
Notifications
You must be signed in to change notification settings - Fork 0
/
step-trim.py
131 lines (113 loc) · 4.9 KB
/
step-trim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#
# This software is Copyright ©️ 2020 The University of Southern California. All Rights Reserved.
# Permission to use, copy, modify, and distribute this software and its documentation for educational, research and non-profit purposes, without fee, and without a written agreement is hereby granted, provided that the above copyright notice and subject to the full license file found in the root of this software deliverable. Permission to make commercial use of this software may be obtained by contacting: USC Stevens Center for Innovation University of Southern California 1150 S. Olive Street, Suite 2300, Los Angeles, CA 90115, USA Email: [email protected]
#
# The full terms of this copyright and license should always be found in the root directory of this software deliverable as "license.txt" and if these terms are not found with this software, please contact the USC Stevens Center for the full license.
#
import boto3
import tempfile
import os
from media_tools import get_file_mime, video_trim, get_video_encoding_type
from module.utils import (
s3_bucket,
load_sentry,
require_env,
fetch_from_graphql,
)
from module.constants import MP4, WEBM_VP9, Supported_Video_Type
from module.api import (
UpdateTaskStatusRequest,
upload_task_status_update,
)
from module.logger import get_logger
load_sentry()
log = get_logger("upload-answer-trim")
aws_region = require_env("REGION")
s3_client = boto3.client("s3", region_name=aws_region)
sns = boto3.client("sns", region_name=aws_region)
upload_bucket = require_env("SIGNED_UPLOAD_BUCKET")
def get_original_video_url(
mentor: str, question: str, video_file_type: Supported_Video_Type
) -> str:
base_url = os.environ.get("STATIC_URL_BASE", "")
return f"{base_url}/videos/{mentor}/{question}/original.{video_file_type.extension}"
def process_task(request):
auth_headers = request["authHeaders"]
stored_task = fetch_from_graphql(
request["mentor"], request["question"], "trimUploadTask", auth_headers
)
if not stored_task:
log.warning("task not found, skipping transcode")
return
if stored_task["status"].startswith("CANCEL"):
log.info("task cancelled, skipping trim")
return
with tempfile.TemporaryDirectory() as work_dir:
work_file = os.path.join(work_dir, "original_video") # don't assume file type
s3_client.download_file(s3_bucket, request["video"], work_file)
s3_path = os.path.dirname(request["video"])
log.info("%s downloaded to %s", request["video"], work_dir)
upload_task_status_update(
UpdateTaskStatusRequest(
mentor=request["mentor"],
question=request["question"],
trim_upload_task={"status": "IN_PROGRESS"},
),
auth_headers,
)
is_vbg_video = request["isVbgVideo"] if "isVbgVideo" in request else False
if is_vbg_video:
try:
file_mime_type = get_file_mime(work_file)
file_encoding = get_video_encoding_type(work_file)
if file_mime_type == "video/webm" and file_encoding == "vp9":
desired_video_file_type = WEBM_VP9
else:
desired_video_file_type = MP4
except Exception as e:
log.info(
f"Failed to determine mime and encoding type for {work_file}, defaulting to mp4"
)
log.info(e)
desired_video_file_type = MP4
else:
desired_video_file_type = MP4
log.info("trimming file %s", work_file)
trim_file = f"{work_file}-trim.{desired_video_file_type.extension}"
video_trim(
work_file,
trim_file,
request["trim"]["start"],
request["trim"]["end"],
desired_video_file_type,
)
log.info("trim completed")
s3_path = f"videos/{request['mentor']}/{request['question']}"
s3_client.upload_file(
trim_file,
s3_bucket,
f"{s3_path}/original.{desired_video_file_type.extension}",
ExtraArgs={"ContentType": desired_video_file_type.mime},
)
log.info("trimmed video uploaded")
upload_task_status_update(
UpdateTaskStatusRequest(
mentor=request["mentor"],
question=request["question"],
trim_upload_task={"status": "DONE"},
),
auth_headers,
)
def handler(event, context):
log.info(event)
request = event["request"]
task = request["trimUploadTask"] if "trimUploadTask" in request else None
if not task:
log.warning("no trim task requested")
return
process_task(request)
# # for local debugging:
# if __name__ == "__main__":
# with open("__events__/step-function-event.json.dist") as f:
# event = json.loads(f.read())
# handler(event, {})