-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
149 lines (126 loc) · 6.73 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import datetime
import argparse
import signal
import pathlib
import traceback
import types
import typing
import faster_whisper
from common.objects import ShareObjects
from utils import CONFIG, get_files, get_logger, remove_file
from utils.file import need_translation
from utils import srt_writer, video2audio
from managers.database import DatabaseManager
from managers.web import WebManager
from managers.record import HistoryRecordManager, TranslationRecordManager
logger = get_logger("main")
def init():
logger.debug(CONFIG)
ShareObjects.dbm = DatabaseManager()
ShareObjects.history_record_manager = HistoryRecordManager()
ShareObjects.translation_record_manager = TranslationRecordManager()
ShareObjects.current_status = "init"
ShareObjects.current_srt = ""
ShareObjects.current_audio = ""
signal.signal(signal.SIGTERM, handle_exit)
signal.signal(signal.SIGINT, handle_exit)
if CONFIG.Web.enable:
WebManager().start()
def handle_exit(sig_num: int, frame: typing.Optional[types.FrameType]) -> None:
logger.info(f"received signum: {sig_num}, frame: {frame}")
if ShareObjects.current_status == "transcribe":
if ShareObjects.current_srt != "" and ShareObjects.current_srt.endswith(".srt"):
logger.info(f"now clean srt file {ShareObjects.current_srt}")
remove_file(ShareObjects.current_srt)
if ShareObjects.current_audio != "":
logger.info(f"now clean audio file {ShareObjects.current_audio}")
remove_file(ShareObjects.current_audio)
if ShareObjects.current_status == "video2audio":
if ShareObjects.current_audio != "":
logger.info(f"now clean audio file {ShareObjects.current_audio}")
remove_file(ShareObjects.current_audio)
logger.info("graceful exit now...")
exit(0)
def get_srt_filepath(video_file_fullpath: str) -> str:
vf = pathlib.Path(video_file_fullpath)
return str(vf.parent.joinpath(f"{vf.stem}.srt"))
def main():
init()
logger.info('loop setting targets')
recorder = HistoryRecordManager()
translation_record = TranslationRecordManager()
try:
logger.info("now looping targets")
for target in CONFIG.Targets:
logger.debug(f"now get files in target {target}")
files = get_files(target)
model: faster_whisper.WhisperModel
model_loaded: bool = False
for video_file in files:
logger.info(f"loop file in target: {target.path}")
# if not have srt file, or configured to overwrite
if need_translation(video_file):
if not model_loaded:
ShareObjects.current_status = "loading"
logger.info(f'loading model: {CONFIG.Whisper.model_name}')
model = faster_whisper.WhisperModel(CONFIG.Whisper.model_name,
device=CONFIG.Whisper.model_device,
compute_type="int8",
download_root=CONFIG.Whisper.model_path)
model_loaded = True
logger.info(f'handle file: {video_file}')
recorder.insert(video_file)
try:
ShareObjects.current_status = "video2audio"
recorder.update_status(video_file, "video2audio", "ffmpeg")
ShareObjects.current_audio = video2audio.video2audio(video_file)
except Exception as e:
logger.warning(e)
recorder.update_status(video_file, "failed", traceback.format_exc())
ShareObjects.reset_current()
continue
try:
logger.info(f'now transcribe {ShareObjects.current_audio}')
ShareObjects.current_srt = get_srt_filepath(video_file)
ShareObjects.current_status = "transcribe"
recorder.update_status(video_file, "transcribe")
writer = srt_writer.SRTWriter(ShareObjects.current_srt)
ts = datetime.datetime.now()
segments, info = model.transcribe(ShareObjects.current_audio, language=target.language,
condition_on_previous_text=True,
vad_filter=False, suppress_blank=False,
max_initial_timestamp=88888,
word_timestamps=True)
logger.info(f"Detected language {info.language} with probability {info.language_probability}")
recorder.update_status(
video_file,
"transcribe",
f"language: {info.language}, {info.language_probability * 100:.2f}%, duration: {info.duration}s")
if CONFIG.Translate.enable is True and CONFIG.Translate.sync is False:
translation_record.insert(ShareObjects.current_srt)
for segment in segments:
logger.debug(f"{video_file} segment: {segment}")
writer.segment_to_srt1(segment)
recorder.update_progress(video_file, segment.start/info.duration)
logger.info(f'srt file for {video_file} generated, {ShareObjects.current_srt}')
# calc time cost
te = datetime.datetime.now()
logger.debug(f"transcribe and translate to srt cost: {(te - ts).total_seconds()} seconds")
recorder.update_status(
video_file,
"success", f"elapse: {(te - ts).total_seconds()}s, total: {info.duration}s")
# clean temp audio file
logger.info(f"now delete tmp audio file {ShareObjects.current_audio}")
remove_file(ShareObjects.current_audio)
# reset status
ShareObjects.reset_current()
except Exception as e:
logger.warning(e)
recorder.update_status(video_file, "failed", traceback.format_exc())
ShareObjects.current_status = "finish"
except Exception as e:
logger.warning(f"run failed {e}")
logger.info(traceback.format_exc())
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
main()