-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelpers.py
238 lines (214 loc) · 9.33 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import subprocess
import mimetypes
import re
import json
import os
from urllib.parse import urlparse, unquote, quote_plus
supported_picture_formats = {'mjpeg': 'jpg', 'png':'png', 'jpeg':'jpg','jpg':'jpg', 'gif':'gif', 'webp':'webp'}
supported_formats = {'mp4':('video/mp4', 0), 'webm':('video/webm', 1), 'ogg':('audio/ogg', 2), 'flac_2':("audio/flac", 1.5),'aac':("audio/aac", 1.8), 'flac':("audio/x-flac", 1.6), 'mp3':('audio/mpeg', 3), 'wav':('audio/wav', 4), 'mp4_2':('audio/mp4', 5)}
def get_mimetype(filename, ffprobe_cmd=None):
""" find the container format of the file """
# default value
mimetype = "video/mp4"
# guess based on filename extension
guess = mimetypes.guess_type(filename)[0]
if guess is not None:
if guess.lower().startswith("video/") or guess.lower().startswith("audio/"):
mimetype = guess
# use the OS file command...
try:
file_cmd = 'file --mime-type -b "%s"' % filename
file_mimetype = subprocess.check_output(file_cmd, shell=True).strip().lower().decode('utf-8')
if file_mimetype.startswith("video/") or file_mimetype.startswith("audio/"):
mimetype = file_mimetype
if not 'matroska' in mimetype or 'mkv' in mimetype:
return mimetype
except:
pass
# use ffmpeg/avconv if installed
if ffprobe_cmd is None:
return mimetype
# ffmpeg/avconv is installed
has_video = False
has_audio = False
format_name = None
ffprobe_cmd = '%s -show_streams -show_format -print_format json -v quiet "%s"' % (ffprobe_cmd, filename)
ffmpeg_process = subprocess.Popen(ffprobe_cmd, stdout=subprocess.PIPE, shell=True)
com = ffmpeg_process.communicate()[0]
dicti = json.loads(com.decode('utf-8'))
for stream in dicti['streams']:
if stream['codec_type'] == 'audio':
has_audio = True
if not format_name:
format_name = stream['codec_name']
if stream['codec_type'] == 'video':
if stream['codec_name'] not in supported_picture_formats.keys():
has_video = True
format_name = stream['codec_name']
# use the default if it isn't possible to identify the format type
if format_name is None:
return mimetype
if has_video:
mimetype = "video/"
else:
mimetype = "audio/"
if "mp4" in format_name:
mimetype += "mp4"
elif "mpeg4" in format_name:
mimetype += "mpeg4"
elif "flv" in format_name:
mimetype = "video/flv"
elif "aac" in format_name:
mimetype = "audio/aac"
elif "webm" in format_name:
mimetype += "webm"
elif "ogg" in format_name:
mimetype += "ogg"
elif "mp3" in format_name:
mimetype = "audio/mpeg"
elif "wav" in format_name:
mimetype = "audio/wav"
elif "flac" in format_name:
mimetype = "audio/flac"
else:
mimetype += "mp4"
return mimetype
def get_transcoder_cmds(preferred_transcoder=None):
""" establish which transcoder utility to use depending on what is installed """
probe_cmd = None
transcoder_cmd = None
ffmpeg_installed = is_transcoder_installed("ffmpeg")
avconv_installed = is_transcoder_installed("avconv")
# if anything other than avconv is preferred, try to use ffmpeg otherwise use avconv
if preferred_transcoder != "avconv":
if ffmpeg_installed:
transcoder_cmd = "ffmpeg"
probe_cmd = "ffprobe"
elif avconv_installed:
transcoder_cmd = "avconv"
probe_cmd = "avprobe"
# otherwise, avconv is preferred, so try to use avconv, followed by ffmpeg
else:
if avconv_installed:
transcoder_cmd = "avconv"
probe_cmd = "avprobe"
elif ffmpeg_installed:
transcoder_cmd = "ffmpeg"
probe_cmd = "ffprobe"
return transcoder_cmd, probe_cmd
def is_transcoder_installed(transcoder_application):
""" check for an installation of either ffmpeg or avconv """
try:
subprocess.check_output([transcoder_application, "-version"])
return True
except OSError:
return False
def decode_network_uri(url):
try:
proc = subprocess.Popen(['youtube-dl', '-j', url], stdout=subprocess.PIPE)
ret = proc.communicate()[0]
dicti = json.loads(ret.decode('utf-8'))
if 'formats' in list(dicti.keys()):
exts = []
for fs in dicti['formats']:
ind = 100
if not fs['ext'] in exts:
exts.append(fs['ext'])
exte = 'mp4'
mime = 'video/mp4'
for ext in exts:
if ext in supported_formats.keys() and ind > supported_formats[ext][1]:
ind = supported_formats[ext][1]
mime = supported_formats[ext][0]
exte = ext
proc = subprocess.Popen(['youtube-dl', '-f', exte, '-g', url], stdout=subprocess.PIPE)
ret = proc.communicate()[0].decode('utf-8')
url = ret
else:
url = dicti['url']
mime = supported_formats[dicti['ext']][0]
if url:
return (url, False, mime, False, None, None, None)
else:
return None
except Exception as e:
return None
def decode_local_uri(uri, transcoder, probe, preferred_transcoder):
url = unquote(urlparse(uri).path)
mime = get_mimetype(url, probe)
transcode = False
if transcoder:
transcode = True
for k in supported_formats.keys():
if mime == supported_formats[k][0]:
transcode = False
metadata = None
thumb = None
if os.path.exists(url):
metadata, thumb, image_mime = get_metadata(url, mime, preferred_transcoder)
return (url, True, mime, transcode and transcoder, metadata, thumb, image_mime)
else:
return None
def get_metadata(filename, mime, preferred_transcoder):
""" get metadata from local files, including thumbnails """
trans, probe = get_transcoder_cmds(preferred_transcoder=preferred_transcoder)
if not probe:
return None, None, None
if mime.startswith("audio/"):
metadata = {'metadataType':3}
proc = subprocess.Popen([probe, '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ret = proc.communicate()
ret = ret[0].decode('utf-8')
info = json.loads(ret)
thumb = None
image_mime = None
if 'tags' in info['format']:
for line in info['format']['tags']:
if re.search('title', line, re.IGNORECASE):
metadata['title'] = info['format']['tags'][line]
elif re.search('album', line, re.IGNORECASE):
metadata['albumName'] = info['format']['tags'][line]
elif re.search('album artist', line, re.IGNORECASE) or re.search('albumartist', line, re.IGNORECASE):
metadata['albumArtist'] = info['format']['tags'][line]
elif re.search('artist', line, re.IGNORECASE):
metadata['artist'] = info['format']['tags'][line]
elif re.search('composer', line, re.IGNORECASE):
metadata['composer'] = info['format']['tags'][line]
elif re.search('track', line, re.IGNORECASE):
try:
track = info['format']['tags'][line].split("/")[0]
metadata['trackNumber'] = int(track.lstrip("0"))
except:
pass
elif re.search('disc', line, re.IGNORECASE):
try:
disc = info['format']['tags'][line].split("/")[0]
metadata['discNumber'] = int(disc.lstrip("0"))
except:
pass
cover_str = None
if len(info['streams']) > 1:
if info['streams'][0]['codec_type'] == 'video':
cover_str = 0
elif info['streams'][1]['codec_type'] == 'video':
cover_str = 1
if cover_str and info['streams'][cover_str]['tags']['comment']:
if info['streams'][cover_str]['tags']['comment'] == 'Cover (front)':
if info['streams'][cover_str]['codec_name'] in supported_picture_formats.keys():
ext = info['streams'][cover_str]['codec_name']
proc = subprocess.Popen([trans, '-i', filename, '-v', 'quiet', '-vcodec', 'copy', '-f', ext, 'pipe:1'], stdout=subprocess.PIPE)
t = proc.communicate()[0]
thumb = t
image_mime = 'image/'+supported_picture_formats[ext]
return metadata, thumb, image_mime
elif mime.startswith("video/"):
metadata = {'metadataType':1}
proc = subprocess.Popen([probe, '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ret = proc.communicate()
ret = ret[0].decode('utf-8')
info = json.loads(ret)
if 'tags' in info['format']:
for line in info['format']['tags']:
if re.search('title', line, re.IGNORECASE):
metadata['title'] = info['format']['tags'][line]
return metadata, None, None